Распределитель Slab - это основной модуль кэш-системы, который в значительной степени определяет, насколько эффективно может использоваться узкое место - память. Остальные 3 части, а именно алгоритм LRU истечения срока входа; и модель, управляемая событиями, основанная на libevent; и постоянная жесткость в отношении распределения данных построены вокруг этого.

Плита I
Плита II
Плита III
LRU I
LRU II
LRU III
Событие управляемый I
Управляемый событиями II (эта статья)

В классической многопоточности блокирующие операции ввода-вывода ограничивают максимальное количество запросов, которые может обработать сервер. Следовательно, для устранения узких мест с пропускной способностью используется асинхронная управляемая событиями модель. Таким образом, синхронный и потенциально медленный процесс делится на логические сегменты, свободные от операций ввода-вывода и выполняемые асинхронно.

Когда дело доходит до асинхронизации, требуется дополнительное пространство для хранения контекста. Это связано с тем, что сегменты логики, которые могут быть связаны с разными сеансами, выполняются с чередованием. Например, в случае, когда асинхронизация реализуется (эмулируется) с использованием синхронной многопоточности, «дополнительное пространство» находится в форме стека потоков. В то время как контекст поддерживается на уровне пользователя в событийном режиме.

conn является представителем этих контекстов в Memcached.

Основная структура данных - conn

typedef struct conn conn;
struct conn {
    int    sfd;

...// scr: not applicable

    enum conn_states  state;

...// scr: not applicable

    struct event event;
    short  ev_flags;
    short  which;   /** which events were just triggered */

    char   *rbuf;   /** buffer to read commands into */
    char   *rcurr;  /** but if we parsed some already, this is where we stopped */
    int    rsize;   /** total allocated size of rbuf */
    int    rbytes;  /** how much data, starting from rcur, do we have unparsed */

    char   *wbuf;
    char   *wcurr;
    int    wsize;
    int    wbytes;
    /** which state to go into after finishing current write */
    enum conn_states  write_and_go;
    void   *write_and_free; /** free this memory after finishing writing */

    char   *ritem;  /** when we read in an item's value, it goes here */
    int    rlbytes;

    /* data for the nread state */

    /**
     * item is used to hold an item structure created after reading the command
     * line of set/add/replace commands, but before we finished reading the actual
     * data. The data is read into ITEM_data(item) to avoid extra copying.
     */

    void   *item;     /* for commands set/add/replace  */

    /* data for the swallow state */
    int    sbytes;    /* how many bytes to swallow */

    /* data for the mwrite state */
    struct iovec *iov;
    int    iovsize;   /* number of elements allocated in iov[] */
    int    iovused;   /* number of elements used in iov[] */

    struct msghdr *msglist;
    int    msgsize;   /* number of elements allocated in msglist[] */
    int    msgused;   /* number of elements used in msglist[] */
    int    msgcurr;   /* element in msglist[] being transmitted now */
    int    msgbytes;  /* number of bytes in current msg */

    item   **ilist;   /* list of items to write out */
    int    isize;
    item   **icurr;
    int    ileft;

...// scr: not applicable

    enum protocol protocol;   /* which protocol this connection speaks */

...// scr: not applicable

    socklen_t request_addr_size;
    unsigned char *hdrbuf; /* udp packet headers */
    int    hdrsize;   /* number of headers' worth of space is allocated */

    bool   noreply;   /* True if the reply should not be sent. */

...// scr: not applicable

    short cmd; /* current command being processed */

...// scr: not applicable

    int keylen;
    conn   *next;     /* Used for generating a list of conn structures */
    LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};

Недвижимость в использовании

fd - файловый дескриптор с корневым событием. используется в последнем посте

state - основная тема этого поста

rbytes - размер обрабатываемых данных. инициализируется try_read_network, обновляется process_get_command, используется try_read_command

rsize - текущий размер буфера чтения. используется try_read_network

rbuf - адрес буфера чтения. используется try_read_network

rcurr - адрес необработанных данных. используется try_read_network

last_cmd_time - обновляется при запуске обработки команды. используется try_read_network

ilist - список элементов, связанных с контекстом; icurr и ileft указывают текущую запись и количество оставшихся записей. используется process_get_command, conn_release_items

iov - фактическое хранилище указателей выходных данных, которое использует msglist; iovsize и iovused - это выделенный и используемый размер соответственно. инициализируется process_command, используется add_iov, sure_iov_space

Здесь структуры данных (struct msghdr и struct iovec) требуются sendmsg. Соответствующий текст об API вставлен ниже.

Поля msg_iov и msg_iovlen сообщения определяют ноль или более буферов, содержащих данные для отправки. msg_iov указывает на массив структур iovec; msg_iovlen должен быть установлен в размер этого массива. В каждой структуре iovec поле iov_base определяет область хранения, а поле iov_len дает ее размер в байтах. Некоторые из этих размеров могут быть нулевыми. Данные из каждой области хранения, указанной msg_iov, отправляются по очереди…

msglist - список, в котором хранятся сами struct msghdr; msgsize и msgused - это выделенный и используемый размер соответственно; msgbytes указывает общий размер размера выходных данных; msgcurr указывает на индекс, который был обработан (записан).

И все же нет ничего лучше, чем диаграмма, демонстрирующая структуры данных и расположение в памяти.

Переключатель состояния

Событие вызывает каскадные изменения state, которые, в свою очередь, вызывают различные процедуры, прежде чем приводная машина откажется от управления и ожидает прибытия нового события. В последнем посте мы видели этот процесс в ветке отправки, в которой

1) conn_listening запускается новым подключением;

2) вызывается dispatch_conn_new, который передает новое принятое fd, а также последующие события в один из рабочих потоков;

3) поток отправки отказывается от ЦП и ждет нового события «нового соединения».

В этом посте мы увидим более сложные переключатели состояний, которые эффективно связывают вместе процедуры, которые мы обсуждали в LRU III,

Состояние данного сеанса представлено conn.state связанного с ним контекста.

и на этот раз мы собираемся применить подход, аналогичный LRU III, то есть отправив команды telnet в экземпляр Memcached для навигации по самому внешнему уровню Приложение Memcached.

Мы также включим удобную подробную информацию с помощью -vvv, чтобы лучше наблюдать за внутренними переходами состояний.

Читать

Сначала (как обычно) мы telnet подключаемся к экземпляру Memcached и добавляем несколько элементов

…
~telnet localhost 11211
Trying 127.0.0.1…
Connected to localhost.
Escape character is ‘^]’.
…

Вывод Memcached

…
< 36 new auto-negotiating client connection
…

Здесь 36 - принятый fd. Как уже упоминалось, с этим fd будут выполняться следующие операции.

Затем мы отправляем в экземпляр Memcached ту же команду чтения, что и в LRU III.

> get test

Вывод Memcached

…
36: going from conn_new_cmd to conn_waiting
36: going from conn_waiting to conn_read
36: going from conn_read to conn_parse_cmd
36: Client using the ascii protocol
< 36 get test
> FOUND KEY test
> 36 sending key test
> 36 END
36: going from conn_parse_cmd to conn_mwrite
36: going from conn_mwrite to conn_new_cmd
36: going from conn_new_cmd to conn_waiting
36: going from conn_waiting to conn_read
…

Как упоминалось в последнем посте, начальное состояние рабочего потока - это

conn_new_cmd

так что мы начнем отсюда.

...
static void drive_machine(conn *c) {
    int nreqs = settings.reqs_per_event; // scr: --------> 1)
...
        case conn_new_cmd:
            /* Only process nreqs at a time to avoid starving other
               connections */
            --nreqs; // scr: ----------------------------> 1)
            if (nreqs >= 0) {
                reset_cmd_handler(c); // scr: -----------> 2)
            } else {
                pthread_mutex_lock(&c->thread->stats.mutex);
                c->thread->stats.conn_yields++;
                pthread_mutex_unlock(&c->thread->stats.mutex);
                if (c->rbytes > 0) {
...// scr: error handling
                }
                stop = true; // scr: --------------------> 3)
            }
            break;
...

static void reset_cmd_handler(conn *c) {
    c->cmd = -1;
    c->substate = bin_no_state;
    if(c->item != NULL) {
        item_remove(c->item);
        c->item = NULL;
    }
    conn_shrink(c);
    if (c->rbytes > 0) {
        conn_set_state(c, conn_parse_cmd); // scr: -----> 2a)
    } else {
        conn_set_state(c, conn_waiting); // scr: -------> 2b)
    }
}

1) nreqs (settings.reqs_per_event) - максимальное количество запросов, которые должна обрабатывать одна итерация цикла событий. Обратите внимание, что порог необходим, потому что новые соединения не будут обрабатываться, если одна итерация цикла событий занимает слишком много времени для завершения. Также обратите внимание, что прерванное соединение будет запущено снова и получит возможность войти в дисковый компьютер с событием чтение, поскольку дескриптор установлен с EV_PERSIST в последнем сообщении.

2) Сбросьте соответствующие свойства в контексте для новой команды.

2a) Обрабатывает случай, когда управляемое событие вводится с событием «чтение».

2b) Обрабатывает случай, когда управляемое событие вводится с событием «новое соединение».

3) Вывести текущую итерацию при достижении порога.

conn_waiting

...
        case conn_waiting:
            if (!update_event(c, EV_READ | EV_PERSIST)) {
                if (settings.verbose > 0)
                    fprintf(stderr, "Couldn't update event\n");
                conn_set_state(c, conn_closing);
                break;
            }

            conn_set_state(c, conn_read);
            stop = true;
            break;
...

Просто сбросьте дескриптор с исходными флагами (т.е. EV_READ, EV_PERSIST), обновите состояние context до следующего перехода (conn_read) и освободите CPU.

conn_read

...
case conn_read:
    res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c); //1)

    switch (res) {
    case READ_NO_DATA_RECEIVED:
        conn_set_state(c, conn_waiting);
        break;
    case READ_DATA_RECEIVED:
        conn_set_state(c, conn_parse_cmd); // scr: -------------> 2)
        break;
    case READ_ERROR:
        conn_set_state(c, conn_closing);
        break;
    case READ_MEMORY_ERROR: /* Failed to allocate more memory */
        /* State already set by try_read_network */
        break;
    }
    break;
...

1) Прочтите из файлового дескриптора и сохраните данные в контексте.

2) Перейти в следующее состояние conn_parse_cmd.

try_read_network

static enum try_read_result try_read_network(conn *c) {
    enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
    int res;
    int num_allocs = 0;
    assert(c != NULL);

    if (c->rcurr != c->rbuf) { // scr: -------------------------> 1)
        if (c->rbytes != 0) /* otherwise there's nothing to copy */
            memmove(c->rbuf, c->rcurr, c->rbytes);
        c->rcurr = c->rbuf;
    }

    while (1) {
        if (c->rbytes >= c->rsize) { // scr: -------------------> 2)
            if (num_allocs == 4) {
                return gotdata;
            }
            ++num_allocs;
            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
            if (!new_rbuf) {
...// scr: error handling
            }
            c->rcurr = c->rbuf = new_rbuf;
            c->rsize *= 2;
        }

        int avail = c->rsize - c->rbytes; // scr: --------------> 3)
        res = read(c->sfd, c->rbuf + c->rbytes, avail);
        if (res > 0) {
...// scr: stat
            gotdata = READ_DATA_RECEIVED;
            c->rbytes += res;
            if (res == avail) { // scr: -----------------------> 3a)
                continue;
            } else {
                break; // scr: --------------------------------> 3b)
            }
        }
        if (res == 0) {
            return READ_ERROR;
        }
        if (res == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) { //src:3b)
                break;
            }
            return READ_ERROR;
        }
    }
    return gotdata;
}

Здесь while (1) используется для обработки логического потока для расширения буфера вместо цикла.

1) Переместите rcurr в начало буфера чтения.

2) Если размер данных превышает размер буфера чтения, попробуйте расширить буфер (максимум в 4 раза).

3) Вычислить доступное буферное пространство для чтения из сокета и соответственно обновить rbytes.

3a) Перейти к 2), если буфер заполнен.

3b) Возврат READ_DATA_RECEIVED, который переключает состояние на conn_parse_cmd в конечном автомате.

conn_parse_cmd

...
case conn_parse_cmd :
    if (try_read_command(c) == 0) {
        /* wee need more data! */
        conn_set_state(c, conn_waiting);
    }

    break;
...

try_read_command

static int try_read_command(conn *c) {
    assert(c != NULL);
    assert(c->rcurr <= (c->rbuf + c->rsize));
    assert(c->rbytes > 0);

    if (c->protocol == negotiating_prot || c->transport == udp_transport)  {
        if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
            c->protocol = binary_prot;
        } else {
            c->protocol = ascii_prot; // scr: ------------------> 1)
        }

        if (settings.verbose > 1) { // scr: --------------------> ~)
            fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,
                    prot_text(c->protocol));
        }
    }

    if (c->protocol == binary_prot) {
...// scr: not applicable
    } else {
        char *el, *cont;

        if (c->rbytes == 0)
            return 0;

        el = memchr(c->rcurr, '\n', c->rbytes); // scr: --------> 2)
        if (!el) {
...// scr: not applicable
        }
        cont = el + 1;
        if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
            el--;
        }
        *el = '\0'; // scr: ------------------------------------> 2)

        assert(cont <= (c->rcurr + c->rbytes));

        c->last_cmd_time = current_time; // scr: ---------------> 3)
        process_command(c, c->rcurr); // scr: ------------------> 4)

        c->rbytes -= (cont - c->rcurr); // scr: ----------------> 5)
        c->rcurr = cont; // scr: -------------------------------> 6)

        assert(c->rcurr <= (c->rbuf + c->rsize));
    }

    return 1;
}

1) Определите тип протокола, в данном случае ascii_prot.

~) Подробное сообщение, которое мы видели в начале.

2) Обрежьте все '\n' и '\r' в конце, сохраните позицию последнего символа команды в el и сохраните конец команды в cont.

3) Обновите last_cmd_time.

4) Вызов process_command, который определяет команду получить, и вызов process_get_command. В process_command, а) tokenize_command - это метод синтаксического анализа строки, который сохраняет команду (т. Е. Получить) в tokens[COMMAND_TOKEN] и ключ (т. Е. Тест) в tokens[KEY_TOKEN]; б) инициализация msgcurr, msgused, iovused; в) инициализация остальных полей в add_msghdr; и г) process_get_command - следующий шаг.

5) Обновите rbytes, указав длину обработанной команды (cont - c->rcurr).

6) Переместите rcurr к необработанным данным, расположенным в конце части команды.

add_msghdr

Прежде чем логика дойдет до process_get_command, в msglist должна быть инициализирована запись для текущей команды.

static int add_msghdr(conn *c)
{
    struct msghdr *msg;

    assert(c != NULL);

    if (c->msgsize == c->msgused) { // scr: --------------------> 1)
        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
        if (! msg) {
            STATS_LOCK();
            stats.malloc_fails++;
            STATS_UNLOCK();
            return -1;
        }
        c->msglist = msg;
        c->msgsize *= 2;
    }

    msg = c->msglist + c->msgused; // scr: ---------------------> 2)

    /* this wipes msg_iovlen, msg_control, msg_controllen, and
       msg_flags, the last 3 of which aren't defined on solaris: */
    memset(msg, 0, sizeof(struct msghdr)); // scr: -------------> 3)

    msg->msg_iov = &c->iov[c->iovused]; // scr: ----------------> 3)

    if (IS_UDP(c->transport) && c->request_addr_size > 0) {
...// scr: UDP related
    }

    c->msgbytes = 0; // scr: -----------------------------------> 4)
    c->msgused++; // scr: --------------------------------------> 5)

    if (IS_UDP(c->transport)) {
...// scr: UDP related
    }

    return 0;
}

1) При необходимости разверните msglist.

2) Укажите на следующую пустую запись в msglist с помощью msg.

3) Инициализируйте запись, указанную msg. Здесь критической операцией является msg->msg_iov = &c->iov[c->iovused];, которая связывает msglist с конкретной записью в iov. (Рисунок - msglist & iov)

4) Инициализируйте msgbytes на 0.

5) Обновите msgused соответственно.

process_get_command

Мы видели этот метод в начале LRU III. На этот раз мы завершим его прохождение в контексте событийного управления.

static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
    char *key;
    size_t nkey;
    int i = 0;
    item *it;
    token_t *key_token = &tokens[KEY_TOKEN];
    char *suffix;
    assert(c != NULL);

    do {
        while(key_token->length != 0) { // scr: ----------------> 1)

            key = key_token->value;
            nkey = key_token->length;

            if(nkey > KEY_MAX_LENGTH) {
                out_string(c, "CLIENT_ERROR bad command line format");
                while (i-- > 0) {
                    item_remove(*(c->ilist + i));
                }
                return;
            }

            it = item_get(key, nkey, c); // scr: ---------------> 2)
            if (settings.detail_enabled) {
                stats_prefix_record_get(key, nkey, NULL != it);
            }
            if (it) {
                if (i >= c->isize) { // scr: -------------------> 3)
                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
                    if (new_list) {
                        c->isize *= 2;
                        c->ilist = new_list;
                    } else {
...// scr: stat
                        item_remove(it);
                        break;
                    }
                }

                if (return_cas)
                {
...// scr: cas
                }
                else
                {
                  MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
                                        it->nbytes, ITEM_get_cas(it));
                  if (add_iov(c, "VALUE ", 6) != 0 || // scr: --> 4)
                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
                      {
                          item_remove(it);
                          break;
                      }
                }

...// scr: verbose & stat

                item_update(it); // scr: -----------------------> 5)
                *(c->ilist + i) = it; // scr: ------------------> 6)
                i++;

            } else {
...// scr: stat
            }

            key_token++; // scr: -------------------------------> 1)
        }

        /*
         * If the command string hasn't been fully processed, get the next set
         * of tokens.
         */
        if(key_token->value != NULL) { // scr: -----------------> 1)
            ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
            key_token = tokens;
        }

    } while(key_token->value != NULL);

    c->icurr = c->ilist; // scr: -------------------------------> 6)
    c->ileft = i; scr: -----------------------------------------> 6)
...// scr: cas & verbose

    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
        || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
        out_of_memory(c, "SERVER_ERROR out of memory writing get response");
    }
    else { // scr: ---------------------------------------------> 7)
        conn_set_state(c, conn_mwrite);
        c->msgcurr = 0;
    }
}

1) Перебрать массив ключевых токенов. Здесь у нас есть один ключевой токен «test».

2) Вызвать item_get для указателя item.

3) Увеличьте размер ilist, если он заполнен, и. Здесь ilist хранит элемент, который обрабатывается. В конце обработки текущей команды этот список используется для пакетного освобождения счетчиков ссылок элемента.

4) add_iov подготавливает вывод этого сеанса.

5) Вызовите item_update, чтобы управлять списками LRU.

6) Свяжите элемент, который в настоящее время обрабатывается, с ilist и обновите связанные поля.

7) Перейти к следующему состоянию conn_mwrite.

add_iov

static int add_iov(conn *c, const void *buf, int len) {
    struct msghdr *m;
    int leftover;
    bool limit_to_mtu;

    assert(c != NULL);

    do {
        m = &c->msglist[c->msgused - 1]; // scr: ---------------> 1)

        /*
         * Limit UDP packets, and the first payloads of TCP replies, to
         * UDP_MAX_PAYLOAD_SIZE bytes.
         */
        limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);

        /* We may need to start a new msghdr if this one is full. */
        if (m->msg_iovlen == IOV_MAX ||
            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
            add_msghdr(c);
            m = &c->msglist[c->msgused - 1]; // scr: -----------> 7)
        }

        if (ensure_iov_space(c) != 0) // scr: ------------------> 2)
            return -1;

        /* If the fragment is too big to fit in the datagram, split it up */
        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE; //scr*)
            len -= leftover;
        } else {
            leftover = 0;
        }

        m = &c->msglist[c->msgused - 1]; // scr: ---------------> 1)
        m->msg_iov[m->msg_iovlen].iov_base = (void *)buf; // scr: -> 3)
        m->msg_iov[m->msg_iovlen].iov_len = len;

        c->msgbytes += len; // scr: ----------------------------> 4)
        c->iovused++; // scr: ----------------------------------> 5)
        m->msg_iovlen++; // scr: -------------------------------> 6)

        buf = ((char *)buf) + len;
        len = leftover;
    } while (leftover > 0);

    return 0;
}

Этот метод инициализирует запись в списке iov и добавляет ее к последнему используемому элементу в списке сообщений (Рисунок - список сообщений и iov).

1) Получите информацию об используемой части msglist.

2) При необходимости израсходуйте iov.

3) Инициализируйте поля iov_base и iov_len в записи iov. Обратите внимание, что msg_iov был связан с позицией конкретной записи в iov, поэтому операции с msg_iov также изменяют содержимое iov.

4) Обновите msgbytes, указав общий размер элемента.

5, 6) Обновите iovused и msg_iovlen соответственно.

7) Обработайте MTU с помощью цикла do while.

conn_mwrite

...
        case conn_mwrite:
          if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
...// scr: UDP related
          }
            switch (transmit(c)) {
...// scr: state processing
            }
            break;
...

Прежде чем объяснять логику процесса состояния conn_mwrite, мы сначала рассмотрим самое главное, а именно:

передавать

static enum transmit_result transmit(conn *c) {
    assert(c != NULL);

    if (c->msgcurr < c->msgused &&
            c->msglist[c->msgcurr].msg_iovlen == 0) { // scr: --> 1)
        /* Finished writing the current msg; advance to the next. */
        c->msgcurr++;
    }
    if (c->msgcurr < c->msgused) { // scr: ---------------------> 2)
        ssize_t res;
        struct msghdr *m = &c->msglist[c->msgcurr];
            
        res = sendmsg(c->sfd, m, 0); 
        if (res > 0) {
...// scr: state
            /* We've written some of the data. Remove the completed
               iovec entries from the list of pending writes. */
            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
                res -= m->msg_iov->iov_len;
                m->msg_iovlen--;
                m->msg_iov++;
            }   

            /* Might have written just part of the last iovec entry;
               adjust it so the next write will do the rest. */
            if (res > 0) {
                m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
                m->msg_iov->iov_len -= res;
            }
            return TRANSMIT_INCOMPLETE;
        }
        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // scr: ------------------------------------------------------> 3)
            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
                if (settings.verbose > 0)
                    fprintf(stderr, "Couldn't update event\n");
                conn_set_state(c, conn_closing);
                return TRANSMIT_HARD_ERROR;
            }
            return TRANSMIT_SOFT_ERROR;
        }
        /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
           we have a real error, on which we close the connection */
        if (settings.verbose > 0)
            perror("Failed to write, and not due to blocking");

        if (IS_UDP(c->transport))
...// scr: UDP related
        else
            conn_set_state(c, conn_closing);
        return TRANSMIT_HARD_ERROR; // scr: --------------------> 4)
    } else {
        return TRANSMIT_COMPLETE; // scr: ----------------------> 5)
    }
}

В качестве основного метода обработки состояния conn_mwrite transmit проходит msglist (начиная с 0, начального значения) и изо всех сил пытается отправить все ожидающие данные, накопленные в текущем сеансе. Это делается внутри себя или в последующих проходах через цикл обработки событий. Только когда операция блокировки обозначена EAGAIN или EWOULDBLOCK, конечный автомат останавливает текущую итерацию цикла обработки событий, и тот же сеанс будет возобновлен, когда буферное пространство снова станет доступным.

1) Если msg_iovlen равно 0, запись msgcurr слота завершена, следовательно, переход к следующему слоту.

2) Вызов sendmsg и перемещает msg_iov, iov_base и iov_len в соответствии с длиной данных (res), которые были успешно записаны. Это приводит к случаю б) обработки состояния.

3) Как уже упоминалось, EAGAIN или EWOULDBLOCK, возвращенные sendmsg, приводят к случаю c) обработки состояния.

4) Ошибки, отличные от двух вышеупомянутых, приводят к случаю c) обработки состояния.

5) c->msgcurr >= c->msgused означает, что write все данные сеанса завершены, что приводит к б) обработки состояния.

Вернуться к государственной обработке

...
        case conn_mwrite:
          if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
...// scr: UDP related
          }
            switch (transmit(c)) {
            case TRANSMIT_COMPLETE:
                if (c->state == conn_mwrite) { // scr: ---------> a)
                    conn_release_items(c);
                    /* XXX:  I don't know why this wasn't the general case */
                    if(c->protocol == binary_prot) {
                        conn_set_state(c, c->write_and_go);
                    } else {
                        conn_set_state(c, conn_new_cmd);
                    }
                } else if (c->state == conn_write) {
...// scr: not applicable
                } else {
...// scr: not applicable
                }
                break;

            case TRANSMIT_INCOMPLETE: // scr: ------------------> b)
            case TRANSMIT_HARD_ERROR:
                break;                   /* Continue in state machine. */

            case TRANSMIT_SOFT_ERROR: // scr: ------------------> c)
                stop = true;
                break;
            }
            break;
...

Согласно результату transmit, логика переходит в следующие 3 ветви:

а) Если результат TRANSMIT_COMPLETE, 1) завершить обработку текущей команды с помощью conn_release_items; 2) переключите state на conn_new_cmd, который 3) в конечном итоге упадет до conn_waiting и, как обсуждалось, завершит текущий цикл обработки событий.

б) Если результатом являются TRANSMIT_INCOMPLETE и TRANSMIT_HARD_ERROR, конечный автомат сохраняет то же состояние, и последующие проходы через цикл событий продолжают потреблять больше данные в msglist. В отличие от операции read, TRANSMIT_INCOMPLETE не приводит к немедленному завершению цикла обработки событий, поскольку операция записи не блокируется, пока буфер не заполнится.

c) TRANSMIT_SOFT_ERROR означает, что буфер заполнен, поэтому немедленно завершить текущую итерацию цикла обработки событий.

Закончить читать

static void conn_release_items(conn *c) {
    assert(c != NULL);

    if (c->item) {
...// scr: not applicable
    }

    while (c->ileft > 0) {
        item *it = *(c->icurr);
        assert((it->it_flags & ITEM_SLABBED) == 0);
        item_remove(it); // scr: ---------------------> 1)
        c->icurr++;
        c->ileft--;
    }

...// scr: cas

    c->icurr = c->ilist;
...// scr: cas
}

Не уверен, заметили вы или нет, но в LRU III есть небольшая ошибка, счетчик ссылок элемента в операции чтение не возвращается к 0 как и в других операциях. Это потому что

1) все права собственности на элемент освобождаются здесь в конце обработки команды (read).

Изначально опубликовано на holmeshe.me.