Как я могу убедиться, что блок потока данных создает потоки только по запросу?

Я написал небольшой конвейер, используя API потока данных TPL, который получает данные из нескольких потоков и выполняет их обработку.

Настройка 1

Когда я настраиваю его для использования MaxDegreeOfParallelism = Environment.ProcessorCount (в моем случае это 8) для каждого блока, я замечаю, что он заполняет буферы в нескольких потоках, и обработка второго блока не начинается до тех пор, пока не будет получено +- 1700 элементов во всех потоках. Вы можете увидеть это в действии здесь.

Настройка 2

Когда я устанавливаю MaxDegreeOfParallelism = 1, я замечаю, что все элементы принимаются в одном потоке, и обработка отправки уже начинается после получения +- 40 элементов. Данные здесь.

Настройка 3

Когда я устанавливаю MaxDegreeOfParallelism = 1 и ввожу задержку в 1000 мс перед отправкой каждого ввода, я замечаю, что элементы отправляются сразу после их получения, и каждый полученный элемент помещается в отдельный поток. Данные здесь.


Пока установка. Мои вопросы следующие:

  1. Когда я сравниваю настройки 1 и 2, я замечаю, что обработка элементов начинается намного быстрее, когда они выполняются последовательно, по сравнению с параллельным (даже с учетом того факта, что параллельный поток имеет в 8 раз больше потоков). Что вызывает эту разницу?

  2. Поскольку это будет выполняться в среде ASP.NET, я не хочу создавать ненужные потоки, поскольку все они исходят из одного пула потоков. Как показано в настройке 3, он по-прежнему будет распределяться по нескольким потокам, даже если данных всего несколько. Это также удивительно, поскольку в настройке 1 я предполагал, что данные распределяются последовательно по потокам (обратите внимание, как все первые 50 элементов переходят в поток 16). Могу ли я убедиться, что он создает новые потоки только по требованию?

  3. Существует еще одна концепция, называемая BufferBlock<T>. Если TransformBlock<T> уже ставит входные данные в очередь, какая практическая разница в замене первого шага в моем конвейере (ReceiveElement) на BufferBlock?


class Program
{
    static void Main(string[] args)
    {
        var dataflowProcessor = new DataflowProcessor<string>();
        var amountOfTasks = 5;
        var tasks = new Task[amountOfTasks];

        for (var i = 0; i < amountOfTasks; i++)
        {
            tasks[i] = SpawnThread(dataflowProcessor, $"Task {i + 1}");
        }

        foreach (var task in tasks)
        {
            task.Start();
        }

        Task.WaitAll(tasks);
        Console.WriteLine("Finished feeding threads"); // Needs to use async main
        Console.Read();
    }

    private static Task SpawnThread(DataflowProcessor<string> dataflowProcessor, string taskName)
    {
        return new Task(async () =>
        {
            await FeedData(dataflowProcessor, taskName);
        });
    }

    private static async Task FeedData(DataflowProcessor<string> dataflowProcessor, string threadName)
    {
        foreach (var i in Enumerable.Range(0, short.MaxValue))
        {
            await Task.Delay(1000); // Only used for the delayedSerialProcessing test
            dataflowProcessor.Process($"Thread name: {threadName}\t Thread ID:{Thread.CurrentThread.ManagedThreadId}\t Value:{i}");
        }
    }
}


public class DataflowProcessor<T>
{
    private static readonly ExecutionDataflowBlockOptions ExecutionOptions = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };

    private static readonly TransformBlock<T, T> ReceiveElement = new TransformBlock<T, T>(element =>
    {
        Console.WriteLine($"Processing received element in thread {Thread.CurrentThread.ManagedThreadId}");
        return element;
    }, ExecutionOptions);

    private static readonly ActionBlock<T> SendElement = new ActionBlock<T>(element =>
    {
        Console.WriteLine($"Processing sent element in thread {Thread.CurrentThread.ManagedThreadId}");
        Console.WriteLine(element);
    }, ExecutionOptions);

    static DataflowProcessor()
    {
        ReceiveElement.LinkTo(SendElement);

        ReceiveElement.Completion.ContinueWith(x =>
        {
            if (x.IsFaulted)
            {
                ((IDataflowBlock) ReceiveElement).Fault(x.Exception);
            }
            else
            {
                ReceiveElement.Complete();
            }
        });
    }


    public void Process(T newElement)
    {      
        ReceiveElement.Post(newElement);
    }
}

person Jeroen Vannevel    schedule 15.06.2016    source источник
comment
TPL Dataflow ничего не знает о потоках, он использует задачи. ThreadPool сопоставляет задачи с потоками. Вы можете изменить это, установив свой собственный TaskScheduler.   -  person i3arnon    schedule 15.06.2016
comment
@i3arnon Настоящая причина, по которой я пишу это, заключается в том, чтобы разгрузить работу, выполняемую в потоке запросов, на другие потоки, чтобы поток запросов был освобожден и мог двигаться дальше. Может быть, я неправильно к этому отношусь?   -  person Jeroen Vannevel    schedule 15.06.2016
comment
@JeroenVannevel, как уже упоминал l3arnon, поток данных TPL использует задачи, поэтому он освобождает поток сообщений. Ваш код слишком сложен. Простой публикации в начальном блоке и ожидания свойства завершения последнего блока должно быть достаточно.   -  person Panagiotis Kanavos    schedule 16.06.2016
comment
@PanagiotisKanavos Я не хочу создавать множество задач, которые все запрашивают потоки из пула потоков IIS. Если я правильно все понял, то, передавая его через поток данных, я могу существенно ограничить количество задач (или потоков?), созданных (MaxDegreeParallelism), при автоматической постановке в очередь других входящих данных. Я понял это неправильно?   -  person Jeroen Vannevel    schedule 17.06.2016
comment
Наверное, несколькими способами. Во-первых, задачи — это абстракция, а не сам поток. TaskScheduler — это то, что назначает задачи уже созданным потокам в пуле потоков. В любом случае MaxDOP=1 по умолчанию, каждый блок использует свою задачу. Вы должны вмешаться, чтобы изменить DOP на большее число. И даже MaxDOP=10 не означает, что вы получаете 10 потоков. Если обработка достаточно быстрая, вы можете использовать только 1 или 5 или что-то еще, что необходимо для обработки входного буфера.   -  person Panagiotis Kanavos    schedule 17.06.2016
comment
Наконец, ограничение буферов позволяет регулировать весь конвейер, заставляя восходящие блоки приостанавливаться, если входной буфер нижестоящего блока заполнен.   -  person Panagiotis Kanavos    schedule 17.06.2016
comment
Например, мне нужно позвонить в GDS, чтобы запросить пару тысяч записей об авиабилетах. Я использую большой DOP для шага запроса, потому что это просто медленные HTTP-запросы. На самом деле лимит зависит от того, сколько запросов в секунду может обрабатывать GDS. Однако этап синтаксического анализа записи имеет более строгие ограничения, потому что я не хочу загружать билеты быстрее, чем я могу их проанализировать.   -  person Panagiotis Kanavos    schedule 17.06.2016
comment
@PanagiotisKanavos Звучит именно то, что мне нужно. Прямо сейчас каждый входящий запрос хранит некоторые данные в нескольких точках на протяжении всего жизненного цикла запроса, а в самом конце запроса отправляет все эти данные в базу данных. Я хочу сделать это в другом потоке (или, по крайней мере, освободить поток запросов), поэтому я буду использовать задачи. Однако я не хочу создавать слишком много задач, потому что все они используют ThreadPool, а исчерпание потока - это настоящая проблема, когда у вас есть тысячи запросов, каждый из которых содержит более 30 вызовов db. Следовательно, я хочу задушить его и поставить в очередь другие вызовы БД.   -  person Jeroen Vannevel    schedule 17.06.2016
comment
Такое впечатление, что вы запускаете не один конвейер, а пять, и все они настроены с параллелизмом, равным количеству процессоров. Это не имеет особого смысла. Превышая подписку на системные ресурсы, вы снижаете эффективность всей системы. В ASP.NET вам редко нужно использовать более одного процессора для одного запроса, и если вы это сделаете, это будет для какого-то специального запроса привилегированного пользователя, а не для всех.   -  person Theodor Zoulias    schedule 26.06.2020


Ответы (1)


Прежде чем развертывать свое решение в среде ASP.NET, я предлагаю вам изменить архитектуру: IIS может приостанавливать потоки в ASP.NET для собственного использования после обработки запроса, поэтому ваша задача может остаться незавершенной. Лучшим подходом является создание отдельного демона службы Windows, который обрабатывает ваш поток данных.

Теперь вернемся к потоку данных TPL.

Мне нравится библиотека TPL Dataflow, но ее документация — настоящий беспорядок.
Единственный полезный документ, который я нашел, — это Введение в поток данных TPL.

В нем есть некоторые подсказки, которые могут быть полезны, особенно те, которые касаются настроек конфигурации (я предлагаю вам изучить реализацию вашей собственной TaskScheduler с использованием вашей собственной реализации TheadPool и опции MaxMessagesPerTask), если вам нужно:

Встроенные блоки потока данных настраиваются, при этом обеспечивается широкий контроль над тем, как и где блоки выполняют свою работу. Вот некоторые ключевые кнопки, доступные разработчику, и все они доступны через DataflowBlockOptions и производные от него типы (ExecutionDataflowBlockOptions и GroupingDataflowBlockOptions), экземпляры которых могут быть предоставлены блокам во время строительства.

  • #P6# <блочная цитата> #P7#
  • #P8# <блочная цитата> #P9#
  • #P10# <блочная цитата> #P11#
  • #P12# <блочная цитата> #P13#
  • #P14# <блочная цитата> #P15#
  • #P16# <блочная цитата> #P17#
  • #P18# <блочная цитата> #P19#
person VMAtm    schedule 15.06.2016
comment
Проблема не в документации, потому что Dataflow действительно прост. На самом деле проблема заключается в том, чтобы признать, что это это так просто и не требует каких-либо специальных трюков или настроек. Стивен Клири написал серию вводных сообщений в блоге также - person Panagiotis Kanavos; 16.06.2016
comment
@PanagiotisKanavos Не согласен: в практических руководствах мало говорится о настройках блоков, в документе, на который я ссылаюсь, говорится. Стивен Клири написал отличные вводные посты, а не OP нуждается в дополнительной настройке. - person VMAtm; 16.06.2016
comment
@PanagiotisKanavos Подробнее о вопросе: я думаю, что самое главное, что OP пытался использовать TPL внутри ASP.NET, что может быть подвержено ошибкам, поскольку потоки могут быть приостановлены через IIS. - person VMAtm; 16.06.2016