Как выполнить несколько задач с помощью библиотеки параллельных задач и вернуться после первой, чтобы фактически вернуть данные

Я работаю над приложением, в котором важно как можно быстрее вернуть данные в последовательный процесс, но может быть несколько источников для получения этих данных. Кроме того, иногда один источник работает быстрее другого, но вы не знаете, какой это будет источник. Я использую ContinueWhenAny(...).Wait(), чтобы дождаться завершения первой задачи, чтобы продолжить и вернуться из вызывающего метода. Однако мне нужно сначала проверить достоверность данных, а только потом вернуться (или если все задачи завершены, и ни одна из них не имеет действительных данных). Прямо сейчас мой код будет возвращать даже недопустимые данные, если это задача, которая завершается первой.

Есть ли способ сделать что-то вроде «ContinueWhenAny», но только тогда, когда Task.Result соответствует определенному условию, иначе дождитесь следующей задачи/и т. д., пока не завершится последняя задача?

Кроме того, мне нужно убедиться, что после того, как один результат действителен, другие потоки отменяются. Эта часть уже работает нормально.

В настоящее время мой код выглядит так (без обработки исключений, только гайки и болты):

        ResultObject result = null;
        var tokenSource = new CancellationTokenSource();
        var tasks = listOfSources
                .Select(i => Task.Factory.StartNew(
                    () =>
                        {
                            i.CancellationToken = tokenSource.Token;
                            //Database Call
                            return i.getData(inputparameters);
                        }, tokenSource.Token));

        Task.Factory.ContinueWhenAny(
                tasks.ToArray(),
                firstCompleted =>
                    {
                        //This is the "result" I need to validate before setting and canceling the other threads
                        result = firstCompleted.Result;
                        tokenSource.Cancel();
                    }).Wait();
        return result;

Любые идеи? Я не хочу использовать ContinueWhenAll, поскольку, если первый вызов занимает 2 секунды, а второй — 10 секунд, я хочу вернуться к последовательному процессу через 2 секунды, если первый вызов возвращает действительные данные, иначе подождите 10 секунд, надеюсь, что result содержит допустимые данные и возвращает неверные данные только в том случае, если все задачи завершены и возвращают недопустимый результат.

--------- ОБНОВЛЕНИЕ ---- Спасибо zmbq за прекрасную идею. Обновленный (рабочий) код ниже и отвечает всем моим требованиям. Однако одно предостережение: разница между этим кодом и предыдущим кодом заключается в том, что этот код возвращает нулевой результат, если ни одна из задач не дает допустимого результата, а не предыдущий код, который сам возвращает недопустимый результат. Было бы несложно изменить и эту версию, чтобы сделать то же самое, но я вполне доволен возвратом null в этом случае для моих целей.

        ResultObject result = null;
        var tokenSource = new CancellationTokenSource();
        var tasks = listOfSources
                .Select(i => Task.Factory.StartNew(
                    () =>
                        {
                            i.CancellationToken = tokenSource.Token;
                            //Database Call
                            return i.getData(inputparameters);
                        }, tokenSource.Token)).ToArray();

        result = GetFirstValidResult(tokenSource,tasks);

        return result;


   private ResultObject GetFirstValidResult(CancellationTokenSource tokenSource, Task<ResultObject>[] tasks)
    {
        ResultObject result = null;
        Task.Factory.ContinueWhenAny(
            tasks,
            firstCompleted =>
                {
                    var testResult = firstCompleted.Result;
                    if(testResult != null && testResult.IsValid())
                    {
                        result = testResult;
                        tokenSource.Cancel();
                    }
                    else
                    {
                        var remainingTasks = tasks.Except(new[]{firstCompleted}).ToArray();
                        if(remainingTasks.Any())
                        {
                            result = GetFirstValidResult(tokenSource, remainingTasks);
                        } 
                    }
                }).Wait();
        return result;
    }

person Royi Hagigi    schedule 18.03.2012    source источник


Ответы (1)


Что ж, если ваш обратный вызов firstCompleted проверит результат и вызовет ContinueWhenAny для оставшихся задач в случае, если результат окажется недопустимым, все будет готово.

Как всегда, я рекомендую вам взглянуть на ZeroMQ. Запускайте задачи и заставляйте каждую задачу записывать сообщение в очередь вывода, если ее результат является допустимым. Основной поток заблокируется в очереди и вернется, когда будет действительное сообщение или когда все задачи уже завершены.

person zmbq    schedule 18.03.2012
comment
Спасибо! У меня есть уже написанный модульный тест для тестирования этого случая, поэтому я проверю и отмечу его как принятый, если он работает. - person Royi Hagigi; 18.03.2012
comment
Работает отлично! Мне пришлось переместить логику ContinueWhenAny в ее собственную функцию, чтобы она могла вызывать себя рекурсивно до тех пор, пока не закончатся задачи. Я включу обновленный код в вопрос для всех, кто хочет его увидеть. - person Royi Hagigi; 18.03.2012