Жизненный цикл этого шаблона аналогичен node.js, есть ли с ним какие-либо проблемы?

Я сейчас знакомлюсь с ключевым словом async в C#, а также с библиотечной экосистемой, работая над практическим решением.

Мои исследования привели меня к следующему статическому коду основного цикла:

Task loop = Task.Factory.StartNew(async () => {

    try {
        using(RedisConnection redis = new RedisConnection("localhost")) {

            var queue = "my_queue";
            var reserved = string.Concat(queue, "_reserved");

            redis.Open();

            while(true) {

                Task.Factory.StartNew(async () => {

                    var pushRequest = await redis.Lists.RemoveLastAndAddFirstString(0, queue, reserved);

                });

            }

        }
    }
    catch(Exception ex) {
        Console.Error.WriteLineAsync(ex.Message);
    }

}, cancellationToken);

loop.Wait();

Как видите, я пытаюсь создать неблокирующий рабочий процесс. Я чувствую, что приближаюсь к полному решению для самого кода основного цикла, и тогда, очевидно, то, что находится внутри внутреннего вызова Task.Factory.StartNew, начнет перемещаться в отдельные классы и методы.

У меня есть несколько вопросов:

  • Следует ли что-либо из того, что у меня есть здесь, быть перемещено из статического основного метода?
  • Я замечаю, что когда я запускаю этот код, активность моего процессора немного повышается, я полагаю, потому что я создаю несколько продолжений, запрашивающих информацию у Redis. Это нормально или надо смягчить?
  • Является ли это действительно асинхронным/неблокирующим на данный момент и будет ли код внутри моей внутренней лямбды никогда не связывать мой основной рабочий поток?
  • Создает ли С# дополнительные потоки для продолжений?

Извинения за то, что могут быть какие-либо любительские ошибки. Как я уже говорил, я все еще учусь, но надеюсь получить последнее руководство от сообщества.


person Alexander Trauzzi    schedule 06.03.2014    source источник
comment
Вы постоянно порождаете новых слушателей, навсегда. Это не очень хорошая идея.   -  person SLaks    schedule 07.03.2014
comment
Можете ли вы предложить, что или где я могу исправить, чтобы он пытался работать только при получении значения?   -  person Alexander Trauzzi    schedule 07.03.2014


Ответы (1)


Вам не нужны дополнительные потоки пула для операций, связанных с вводом-выводом (подробнее об этом здесь). В этом свете ваш рефакторинговый код может выглядеть следующим образом:

async Task AsyncLoop(CancellationToken cancellationToken)
{
    using (RedisConnection redis = new RedisConnection("localhost"))
    {

        var queue = "my_queue";
        var reserved = string.Concat(queue, "_reserved");

        redis.Open();

        while (true)
        {
            // observe cancellation requests
            cancellationToken.ThrowIfCancellationRequested();

            var pushRequestTask = redis.Lists.RemoveLastAndAddFirstString(0, queue, reserved);

            // continue on a random thread after await, 
            // thanks to ConfigureAwait(false)
            var pushRequest = await pushRequestTask.ConfigureAwait(false);

            // process pushRequest
        }
    }
}

void Loop(CancellationToken cancellationToken)
{
    try
    {
        AsyncLoop().Wait();
    }
    catch (Exception ex)
    {
        while (ex is AggregateException && ex.InnerException != null)
            ex = ex.InnerException;

        // might be: await Console.Error.WriteLineAsync(ex.Message),
        // but you cannot use await inside catch, so: 
        Console.Error.WriteLine(ex.Message);
    }
}

Обратите внимание: отсутствие Task.Run / Task.Factory.StartNew не означает, что всегда будет одна и та же цепочка. В этом примере код после await, скорее всего, будет продолжен в другом потоке из-за ConfigureAwait(false). Однако этот ConfigureAwait(false) может быть избыточным, если в вызывающем потоке нет контекста синхронизации (например, для консольного приложения). Чтобы узнать больше, обратитесь к статьям, перечисленным в async-await Wiki, в частности к "Все дело в контексте синхронизации" Стивена Клири.

Можно воспроизвести однопоточное поведение цикла обработки событий Node.js внутри AsyncLoop, поэтому все продолжения после await выполняются в одном и том же потоке. Вы должны использовать настраиваемый планировщик задач (или настраиваемые SynchronizationContext и TaskScheduler.FromCurrentSynchronizationContext). Это не сложно реализовать, хотя я не уверен, что это то, о чем вы просите.

В случае, если это приложение на стороне сервера, сериализация продолжений await в том же потоке в стиле Node.js может повредить масштабируемости приложения (особенно если между awaits есть какая-то работа, связанная с процессором).

Чтобы ответить на ваши конкретные вопросы:

Следует ли что-либо из того, что у меня есть здесь, быть перемещено из статического основного метода?

Если вы используете цепочку методов async, в самом верхнем кадре стека будет переход от асинхронного к Task. В случае с консольным приложением можно блокировать Task.Wait() внутри Main, что является самой верхней точкой входа.

Я замечаю, что когда я запускаю этот код, активность моего процессора немного повышается, я полагаю, потому что я создаю несколько продолжений, запрашивающих информацию у Redis. Это нормально или надо смягчить?

Трудно точно сказать, почему повышается активность вашего процессора. Я бы предпочел обвинить серверную часть этого приложения, то, что прослушивает и обрабатывает запросы Redis на localhost.

Является ли это действительно асинхронным/неблокирующим на данный момент и будет ли код внутри моей внутренней лямбды никогда не связывать мой основной рабочий поток?

Код, который я разместил, действительно неблокирующий, при условии, что new RedisConnection не блокирует, а RemoveLastAndAddFirstString не блокирует нигде внутри своей синхронной части.

Создает ли С# дополнительные потоки для продолжений?

Не явно. Поток отсутствует, пока асинхронная операция ввода-вывода находится в процессе выполнения. Однако, когда операция будет завершена, будет поток IOCP из ThreadPool, назначенный для обработки завершения. Будет ли код после await продолжаться в этом потоке или в исходном потоке, зависит от контекста синхронизации. Как уже упоминалось, этим поведением можно управлять с помощью ConfigureAwait(false).

person noseratio    schedule 07.03.2014
comment
Отличная информация, большое спасибо! Я думаю, что также стоит упомянуть, что я думаю, что, хотя узел является однопоточным, то, что я читаю, указывает на то, что это просто рабочий процесс, который никогда не блокируется. Другие операции могут иногда приводить к созданию нового потока, это просто основная логика, которая никогда не задерживается. Это, вероятно, делает предложенный вами код почти идентичным подходу node.js? - person Alexander Trauzzi; 07.03.2014
comment
Кроме того, быстрое замечание. Должен ли метод AsyncLoop где-то возвращать задачу? - person Alexander Trauzzi; 07.03.2014
comment
@Omega, действительно, рабочий будет блокировать только в том случае, если где-либо есть блокирующая часть (например, синхронное разрешение DNS). Всякий раз, когда это происходит и нет подходящего естественного асинхронного API для использования, вы можете обернуть такие вызовы с помощью await Task.Run(). Таким образом, AsyncLoop похож на цикл обработки событий Node, кроме того, для продолжений await нет привязки к потоку. AsyncLoop не должен ничего возвращать явно, потому что есть async Task подпись (async отсутствовала, исправлена). Компилятор C# заставляет его возвращать Task. Без async это было бы необходимо, но вы не могли бы использовать await внутри него. - person noseratio; 08.03.2014
comment
Большое спасибо за вашу помощь, здесь много ясности. - person Alexander Trauzzi; 08.03.2014
comment
Последний вопрос. Во время повторного запуска я заметил, что процесс моей программы увеличил активность процессора и начал потреблять много памяти. Безопасно ли запускать await pushRequestTask? Это похоже на то, что он будет называть это снова и снова? - person Alexander Trauzzi; 08.03.2014
comment
@ Омега, это безопасно. Использование ЦП и памяти действительно зависит от того, что происходит внутри `RemoveLastAndAddFirstString. Try await Task.Delay(500)`, и сравните свой пробег. - person noseratio; 08.03.2014