Как обрабатывать объединение нескольких асинхронных CompletableFuture?

Я читал, что CompletableFuture может объединять несколько фьючерсов с runAfterBoth, но что, если я хочу объединить более двух?

CompletableFuture<Boolean> a = new CompletableFuture<>();
CompletableFuture<Boolean> b = new CompletableFuture<>();
CompletableFuture<Boolean> c = new CompletableFuture<>();

List<CompletableFuture<Boolean>> list = new LinkedList<>();

list.add(a);
list.add(b);
list.add(c);

// Could be any number
for (CompletableFuture<Boolean> f : list) {
   f.runAfter..
}

Мой вариант использования заключается в том, что я отправляю сообщения в несколько сокетов, чтобы найти один объект, который может быть или не быть ни в одном из них.

В настоящее время я рассматриваю это как решение:

CompletableFuture<Boolean> a = new CompletableFuture<>();
CompletableFuture<Boolean> b = new CompletableFuture<>();
CompletableFuture<Boolean> c = new CompletableFuture<>();

List<CompletableFuture<Boolean>> list = new LinkedList<>();

list.add(a);
list.add(b);
list.add(c);

CompletableFuture<Boolean> result = new CompletableFuture<>();

Thread accept = new Thread(() -> {
   for (CompletableFuture<Boolean> f : list)
      if (f.join() != null)
         result.complete(f.join());
});

accept.start();

// Actual boolean value returned
result.get();

Но это какой-то беспорядок. И в моем случае я хочу продолжить обработку, как только получу действительный результат (не нулевой), вместо того, чтобы ждать недопустимых результатов.

Например, a занимает 5 секунд, и цикл ожидает его, хотя b уже завершился через 2 секунды; но цикл не знает об этом, потому что он все еще ожидает a.

Есть ли шаблон для работы с объединением нескольких асинхронных фьючерсов, когда я могу немедленно ответить на успешное завершение?

Другая возможность:

public static class FutureUtil {
public static <T> CompletableFuture<T> anyOfNot(
   Collection<CompletableFuture<T>> collection,
   T value,
   T defaultValue)
{
   CompletableFuture<T> result = new CompletableFuture<>();

   new Thread(() -> {
      for (CompletableFuture<T> f : collection) {
         f.thenAccept((
            T r) -> {
            if ((r != null && !r.equals(value))
               || (value != null && !value.equals(r)))
               result.complete(r);
         });
      }

      try {
         for (CompletableFuture<T> f : collection)
            f.get();
      }
      catch (Exception ex) {
         result.completeExceptionally(ex);
      }

      result.complete(defaultValue);
   }).start();

   return result;
}
}

Пример использования:

CompletableFuture<Boolean> a = new CompletableFuture<>();
CompletableFuture<Boolean> b = new CompletableFuture<>();
CompletableFuture<Boolean> c = new CompletableFuture<>();

List<CompletableFuture<Boolean>> list = new LinkedList<>();

list.add(a);
list.add(b);
list.add(c);

CompletableFuture<Boolean> result = FutureUtil.anyOfNot(list, null, false);

result.get();

person Zhro    schedule 10.12.2016    source источник
comment
Вы ищете CompletableFuture.allOf()?   -  person teppic    schedule 10.12.2016
comment
Вроде, как бы, что-то вроде. Я не хочу ждать других фьючерсов, если один из них уже завершился с допустимым результатом (не нулевым).   -  person Zhro    schedule 10.12.2016
comment
Тогда, возможно, anyOf ?   -  person Marco13    schedule 10.12.2016
comment
anyOf завершится, если логическое значение вернет null. Мне нужно только будущее, которое возвращает ненулевой результат.   -  person Zhro    schedule 10.12.2016
comment
Аналогичный вопрос с полезным ответом здесь   -  person teppic    schedule 10.12.2016
comment
Это похоже, но по-прежнему зависит от anyOf, который завершится с нулевым значением.   -  person Zhro    schedule 10.12.2016


Ответы (1)


Если вы знаете, что по крайней мере один из CF в списке завершится с ненулевым значением, вы можете попробовать следующее:

public static <T> CompletableFuture<T> firstNonNull(List<CompletableFuture<T>> completableFutures) {

    final CompletableFuture<T> completableFutureResult = new CompletableFuture<>();
    completableFutures.forEach(cf -> cf.thenAccept(v -> {
        if (v != null) {
            completableFutureResult.complete(v);
        }
    }));
    return completableFutureResult;
}

Если нет гарантии, что хотя бы одна из CF вернет ненулевое значение, нужно что-то более сложное:

public static <T> CompletableFuture<T> firstNonNull(List<CompletableFuture<T>> completableFutures, T defaultValue) {

    final CompletableFuture<T> completableFutureResult = new CompletableFuture<>();
    completableFutures.forEach(cf -> cf.thenAccept(v -> {
        if (v != null) {
            completableFutureResult.complete(v);
        }
    }));
    //handling the situation where all the CFs returned null 
    CompletableFuture<Void> allCompleted = CompletableFuture
        .allOf((CompletableFuture<?>[]) completableFutures.toArray());
    allCompleted.thenRun(() -> {
        //checking first if any of the completed delivered a non-null value, to avoid race conditions with the block above 
        completableFutures.forEach(cf -> {
            final T result = cf.join();
            if (result != null) {
                completableFutureResult.complete(result);
            }
        });
        //if still not completed, completing with default value
        if ( !completableFutureResult.isDone()) {
            completableFutureResult.complete(defaultValue);
        }
    });
    return completableFutureResult;
}
person Ruben    schedule 10.12.2016
comment
Спасибо за ваш ответ. Но это почти то же самое, что и пример кода в конце моего вопроса, но без обработки исключений и значения по умолчанию. Обратите внимание, что вам не нужно проверять isDone() для уже завершенного будущего. - person Zhro; 10.12.2016
comment
В своем решении вы создаете ненужный Thread и блокируете его. Я показал вам, что вам не нужен дополнительный поток для выполнения той же работы. Обратите внимание, что вам не нужно проверять isDone() на уже завершенном будущем: ну, вы не можете знать, завершено ли оно на этом этапе. Возможно, блок allOf выполняется раньше блока thenAccept. Вы правы с обработкой исключений, но это не было частью вашего вопроса. - person Ruben; 10.12.2016
comment
Я получаю следующее исключение при попытке запустить ваш код. Можешь подтвердить? java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Ljava.util.concurrent.CompletableFuture - person Zhro; 11.12.2016
comment
Какая линия? можешь опубликовать пример? - person Ruben; 11.12.2016
comment
@Zhro измените его на completableFutures.stream().toArray(CompletableFuture[]::new);, и все будет в порядке. - person Didier L; 14.12.2016