Асинхронный производитель / потребитель с регулируемой продолжительностью и пакетным потреблением

Я пытаюсь создать службу, которая предоставляет очередь для многих асинхронных клиентов, чтобы делать запросы и ждать ответа. Мне нужно регулировать обработку очереди с помощью X запросов на продолжительность Y. Например: 50 веб-запросов в секунду. Это для сторонней службы REST, где я могу отправлять только X запросов в секунду.

Нашел много SO-вопросов, это привело меня к использованию потока данных TPL, я использовал TranformBlock, чтобы обеспечить свое настраиваемое регулирование, а затем X количество ActionBlocks для параллельного выполнения задач. Реализация Action кажется немного неуклюжей, поэтому мне интересно, есть ли для меня лучший способ передать Задачи в конвейер, который уведомляет вызывающих абонентов о завершении.

Мне интересно, есть ли лучший или более оптимальный / простой способ делать то, что я хочу? Есть ли какие-то явные проблемы с моей реализацией? Я знаю, что здесь не хватает отмены и обработки исключений, и я сделаю это в следующий раз, но мы будем рады вашим комментариям.

У меня есть расширенный пример Стивена Клири для моего конвейера потока данных и использовал
концепцию TransformBlock с регулированием по времени Свика. Мне интересно, можно ли легко достичь того, что я создал, с помощью чистого дизайна SemaphoreSlim, его дросселирования на основе времени с максимальным операции, которые, я думаю, усложнят ситуацию.

Вот последняя реализация. FIFO queue async queue, в которой я могу передавать настраиваемые действия.

public class ThrottledProducerConsumer<T>
{
    private class TimerState<T1>
    {
        public SemaphoreSlim Sem;
        public T1 Value;
    }

    private BufferBlock<T> _queue;
    private IPropagatorBlock<T, T> _throttleBlock;
    private List<Task> _consumers;

    private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, Int32 MaxPerInterval)
    {
        SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval);
        return new TransformBlock<T1, T1>(async (x) =>
        {
            var sw = new Stopwatch();
            sw.Start();
            //Console.WriteLine($"Current count: {_sem.CurrentCount}");
            await _sem.WaitAsync();

            sw.Stop();
            var now = DateTime.UtcNow;
            var releaseTime = now.Add(Interval) - now;

            //-- Using timer as opposed to Task.Delay as I do not want to await or wait for it to complete
            var tm = new Timer((s) => {
                var state = (TimerState<T1>)s;
                //Console.WriteLine($"RELEASE: {state.Value} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
                state.Sem.Release();

            }, new TimerState<T1> { Sem = _sem, Value = x }, (int)Interval.TotalMilliseconds,
            -1);

            /*  
            Task.Delay(delay).ContinueWith((t)=>
            {
                Console.WriteLine($"RELEASE(FAKE): {x} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
                //_sem.Release();
            });
            */

            //Console.WriteLine($"{x} was tramsformed in {sw.ElapsedMilliseconds}ms. Will release {now.Add(Interval):mm:ss:ff}");
            return x;
        },
             //new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
             //
             new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 10 });
    }

    public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1)
    {
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true,  };

        //-- Create the Queue
        _queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, });

        //-- Create and link the throttle block
        _throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval);
        _queue.LinkTo(_throttleBlock, linkOptions);

        //-- Create and link the consumer(s) to the throttle block
        var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem);
        _consumers = new List<Task>();
        for (int i = 0; i < MaxConsumers; i++)
        {
            var consumer = new ActionBlock<T>(consumerAction, consumerOptions);
            _throttleBlock.LinkTo(consumer, linkOptions);
            _consumers.Add(consumer.Completion);
        }

        //-- TODO: Add some cancellation tokens to shut this thing down
    }

   /// <summary>
   /// Default Consumer Action, just prints to console
   /// </summary>
   /// <param name="ItemToConsume"></param>
    private void ConsumeItem(T ItemToConsume)
    {
        Console.WriteLine($"Consumed {ItemToConsume} at {DateTime.UtcNow}");
    }

    public async Task EnqueueAsync(T ItemToEnqueue)
    {
        await this._queue.SendAsync(ItemToEnqueue);
    }

    public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue)
    {
        foreach (var item in ItemsToEnqueue)
        {
            await this._queue.SendAsync(item);
        }
    }

    public async Task CompleteAsync()
    {
        this._queue.Complete();
        await Task.WhenAll(_consumers);
        Console.WriteLine($"All consumers completed {DateTime.UtcNow}");
    }
}

Методика испытаний

    public class WorkItem<T>
    {
        public TaskCompletionSource<T> tcs;
        //public T respone;
        public string url;
        public WorkItem(string Url)
        {
            tcs = new TaskCompletionSource<T>();
            url = Url;
        }
        public override string ToString()
        {
            return $"{url}";
        }
    }

    public static void TestQueue()
    {
        Console.WriteLine("Created the queue");

        var defaultAction = new Action<WorkItem<String>>(async i => {
            var taskItem = ((WorkItem<String>)i);
            Console.WriteLine($"Consuming: {taskItem.url} {DateTime.UtcNow:mm:ss:ff}");
            //-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url);
            await Task.Delay(5000);
            taskItem.tcs.SetResult($"{taskItem.url}");
            //Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}");
        });

        var queue = new ThrottledProducerConsumer<WorkItem<String>>(TimeSpan.FromMilliseconds(2000), 5, 2, defaultAction);

        var results = new List<Task>();
        foreach (var no in Enumerable.Range(0, 20))
        {
            var workItem = new WorkItem<String>($"http://someurl{no}.com");
            results.Add(queue.EnqueueAsync(workItem));
            results.Add(workItem.tcs.Task);
            results.Add(workItem.tcs.Task.ContinueWith(response =>
            {
                Console.WriteLine($"Received: {response.Result} {DateTime.UtcNow:mm:ss:ff}");
            }));
        }

        Task.WhenAll(results).Wait();
        Console.WriteLine("All Work Items Have Been Processed");
    }

person Nicholas    schedule 05.10.2016    source источник
comment
Некоторые мысли: 1) Вы используете C # и упоминаете веб-запросы; IIS в значительной степени это уже делает (включая регулирование, хотя, по общему признанию, на стороне ввода, а не вывода). 2) Возможно, что-то столь же простое, как ConcurrentQueue, - это все, что вам нужно для обработки параллелизма, с SemaphoreSlim для регулирования. 3) Если вы в конечном итоге собираетесь масштабировать это, особенно для нескольких машин, обслуживающих запросы, сервисная шина может быть лучшим вариантом? А вот с дросселированием было бы труднее.   -  person sellotape    schedule 05.10.2016
comment
Почему бы не добавить в конец очереди еще один блок, который будет уведомлять вызывающего абонента об обработанном запросе?   -  person VMAtm    schedule 05.10.2016
comment
@VMAtm Я предполагаю, что это способ, однако я думал о передаче делегата действия или даже задачи в очередь, которая уведомляла бы вызывающего по завершении.   -  person Nicholas    schedule 06.10.2016
comment
Замыкания очень потребляют память, поэтому вам, вероятно, не стоит их использовать в таком случае. Блок действий, определяющий по ключу, как он должен уведомлять вызывающего абонента, как по мне, намного предпочтительнее.   -  person VMAtm    schedule 06.10.2016
comment
@sellotape в идеале я бы переместил это в отдельную службу, но мне нужна быстрая победа, которая позволит мне создать очередь запросов, которые я могу хранить в памяти (кэшировать и инициализировать после каждого перезапуска домена приложения). У меня нет большого опыта работы с этими объектами, поэтому я немного учусь. Вышеупомянутое решение Datafow, похоже, делает то, что я хочу, с FIFIO, а другое преимущество - параллельное выполнение запросов, но я еще ничего не пометил. У вас есть еще причины, по которым мне не следует придерживаться вышеизложенного?   -  person Nicholas    schedule 06.10.2016
comment
@VMAtm О, я этого не осознавал, когда вы имеете в виду закрытие, вы говорите о задачах? Единственное, что я думал о задаче, это продолжение и отмена, я мог бы включить тайм-аут с запросом, чтобы, если он достигнет точки выполнения, а клиента больше нет, я мог бы отклонить запрос.   -  person Nicholas    schedule 06.10.2016
comment
Я думал, что вы собираетесь использовать замыкания в задачах, которые могут снизить производительность, только это я имел в виду. Итак, вы должны измерить свое решение   -  person VMAtm    schedule 06.10.2016
comment
Позвольте нам продолжить это обсуждение в чате.   -  person Nicholas    schedule 06.10.2016


Ответы (1)


С момента моего запроса я создал класс ThrottledConsumerProducer на основе потока данных TPL. Он был протестирован в течение нескольких дней, включая параллельных производителей, которые были поставлены в очередь и завершены по порядку, около 281 КБ без каких-либо проблем, однако есть ошибки, которых я не обнаружил.

  1. Я использую BufferBlock в качестве асинхронной очереди, это связано с:
  2. TransformBlock, который обеспечивает необходимое регулирование и блокировку. Он используется вместе с SempahoreSlim для управления максимальным количеством запросов. По мере того, как каждый элемент проходит через блок, он увеличивает семафор и планирует задачу для запуска X длительности позже, чтобы освободить семафор на единицу. Таким образом, у меня есть скользящее окно из X запросов на продолжительность; именно то, что я хотел. Из-за TPL я также использую параллелизм с подключенными:
  3. ActionBlock, которые отвечают за выполнение нужной мне задачи.

Классы являются общими, поэтому они могут быть полезны другим, если им понадобится что-то подобное. Я не писал об отмене или обработке ошибок, но подумал, что мне нужно просто отметить это как ответ, чтобы продолжить. Я был бы очень рад увидеть некоторые альтернативы и отзывы, а не отмечать мой как принятый ответ. Спасибо за прочтение.

ПРИМЕЧАНИЕ. Я удалил таймер из исходной реализации, поскольку он делал странные вещи, из-за которых семафор высвобождал больше, чем максимум, я предполагаю, что это ошибка динамического контекста, она произошла, когда я начал выполнять параллельные запросы . Я работал над этим, используя Task.Delay, чтобы запланировать снятие блокировки семафора.

Ограниченный производитель-потребитель

public class ThrottledProducerConsumer<T>
{
    private BufferBlock<T> _queue;
    private IPropagatorBlock<T, T> _throttleBlock;
    private List<Task> _consumers;

    private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, 
        Int32 MaxPerInterval, Int32 BlockBoundedMax = 2, Int32 BlockMaxDegreeOfParallelism = 2)
    {
        SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval, MaxPerInterval);
        return new TransformBlock<T1, T1>(async (x) =>
        {
            //Log($"Transform blk: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count: {_sem.CurrentCount}");
            var sw = new Stopwatch();
            sw.Start();
            //Console.WriteLine($"Current count: {_sem.CurrentCount}");
            await _sem.WaitAsync();

            sw.Stop();
            var delayTask = Task.Delay(Interval).ContinueWith((t) =>
            {
                //Log($"Pre-RELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count {_sem.CurrentCount}");
                _sem.Release();
                //Log($"PostRELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphoere Count {_sem.CurrentCount}");
            });
            //},TaskScheduler.FromCurrentSynchronizationContext());                
            //Log($"Transformed: {x} in queue {sw.ElapsedMilliseconds}ms. {DateTime.Now:mm:ss:ff} will release {DateTime.Now.Add(Interval):mm:ss:ff} Semaphoere Count {_sem.CurrentCount}");
            return x;
        },
             //-- Might be better to keep Bounded Capacity in sync with the semaphore
             new ExecutionDataflowBlockOptions { BoundedCapacity = BlockBoundedMax,
                 MaxDegreeOfParallelism = BlockMaxDegreeOfParallelism });
    }

    public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, 
        Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1, 
        Int32 MaxThrottleBuffer = 20, Int32 MaxDegreeOfParallelism = 10)
    {
        //-- Probably best to link MaxPerInterval and MaxThrottleBuffer 
        //  and MaxConsumers with MaxDegreeOfParallelism
        var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true,  };

        //-- Create the Queue
        _queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, });

        //-- Create and link the throttle block
        _throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval);
        _queue.LinkTo(_throttleBlock, linkOptions);

        //-- Create and link the consumer(s) to the throttle block
        var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem);
        _consumers = new List<Task>();
        for (int i = 0; i < MaxConsumers; i++)
        {
            var consumer = new ActionBlock<T>(consumerAction, consumerOptions);
            _throttleBlock.LinkTo(consumer, linkOptions);
            _consumers.Add(consumer.Completion);
        }

        //-- TODO: Add some cancellation tokens to shut this thing down
    }

   /// <summary>
   /// Default Consumer Action, just prints to console
   /// </summary>
   /// <param name="ItemToConsume"></param>
    private void ConsumeItem(T ItemToConsume)
    {
        Log($"Consumed {ItemToConsume} at {DateTime.UtcNow}");
    }

    public async Task EnqueueAsync(T ItemToEnqueue)
    {
        await this._queue.SendAsync(ItemToEnqueue);
    }

    public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue)
    {
        foreach (var item in ItemsToEnqueue)
        {
            await this._queue.SendAsync(item);
        }
    }

    public async Task CompleteAsync()
    {
        this._queue.Complete();
        await Task.WhenAll(_consumers);
        Console.WriteLine($"All consumers completed {DateTime.UtcNow}");
    }
    private static void Log(String messageToLog)
    {
        System.Diagnostics.Trace.WriteLine(messageToLog);
        Console.WriteLine(messageToLog);
    }

}

- Пример использования -

Стандартный рабочий элемент

public class WorkItem<Toutput,Tinput>
{
    private TaskCompletionSource<Toutput> _tcs;
    public Task<Toutput> Task { get { return _tcs.Task; } }

    public Tinput InputData { get; private set; }
    public Toutput OutputData { get; private set; }

    public WorkItem(Tinput inputData)
    {
        _tcs = new TaskCompletionSource<Toutput>();
        InputData = inputData;
    }

    public void Complete(Toutput result)
    {
        _tcs.SetResult(result);
    }

    public void Failed(Exception ex)
    {
        _tcs.SetException(ex);
    }

    public override string ToString()
    {
        return InputData.ToString();
    }
}

Создание блока действий, выполняемого в конвейере

    private Action<WorkItem<Location,PointToLocation>> CreateProcessingAction()
    {
        return new Action<WorkItem<Location,PointToLocation>>(async i => {
            var sw = new Stopwatch();
            sw.Start();

            var taskItem = ((WorkItem<Location,PointToLocation>)i);
            var inputData = taskItem.InputData;

            //Log($"Consuming: {inputData.Latitude},{inputData.Longitude} {DateTime.UtcNow:mm:ss:ff}");

            //-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url);
            await Task.Delay(500);
            sw.Stop();
            Location outData = new Location()
            {
                Latitude = inputData.Latitude,
                Longitude = inputData.Longitude,
                StreetAddress = $"Consumed: {inputData.Latitude},{inputData.Longitude} Duration(ms): {sw.ElapsedMilliseconds}"
            };
            taskItem.Complete(outData);
            //Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}");
        });

    }

Метод тестирования. Вам потребуется предоставить собственную реализацию для PointToLocation и Location. Просто пример того, как вы могли бы использовать его со своими собственными классами.

    int startRange = 0;
    int nextRange = 1000;
    ThrottledProducerConsumer<WorkItem<Location,PointToLocation>> tpc;
    private void cmdTestPipeline_Click(object sender, EventArgs e)
    {
        Log($"Pipeline test started {DateTime.Now:HH:mm:ss:ff}");

        if(tpc == null)
        {
            tpc = new ThrottledProducerConsumer<WorkItem<Location, PointToLocation>>(
                //1010, 2, 20000,
                TimeSpan.FromMilliseconds(1010), 45, 100000,
                CreateProcessingAction(),
                2,45,10);
        }

        var workItems = new List<WorkItem<Models.Location, PointToLocation>>();
        foreach (var i in Enumerable.Range(startRange, nextRange))
        {
            var ptToLoc = new PointToLocation() { Latitude = i + 101, Longitude = i + 100 };
            var wrkItem = new WorkItem<Location, PointToLocation>(ptToLoc);
            workItems.Add(wrkItem);


            wrkItem.Task.ContinueWith(t =>
            {
                var loc = t.Result;
                string line = $"[Simulated:{DateTime.Now:HH:mm:ss:ff}] - {loc.StreetAddress}";
                //txtResponse.Text = String.Concat(txtResponse.Text, line, System.Environment.NewLine);
                //var lines = txtResponse.Text.Split(new string[] { System.Environment.NewLine},
                //    StringSplitOptions.RemoveEmptyEntries).LongCount();

                //lblLines.Text = lines.ToString();
                //Log(line);

            });
            //}, TaskScheduler.FromCurrentSynchronizationContext());

        }

        startRange += nextRange;

        tpc.EnqueueItemsAsync(workItems);

        Log($"Pipeline test completed {DateTime.Now:HH:mm:ss:ff}");
    }
person Nicholas    schedule 07.10.2016