Как использовать CompletableFuture, не рискуя ошибкой StackOverflowError?

Я хочу пройтись по пространству поиска асинхронной функции. Я закодировал логику следующим образом:

/**
 * Assuming that a function maps a range of inputs to the same output value, minimizes the input value while
 * maintaining the output value.
 *
 * @param previousInput the last input known to return {@code target}
 * @param currentInput  the new input value to evaluate
 * @param function      maps an input to an output value
 * @param target        the expected output value
 * @return the minimum input value that results in the {@code target} output value
 * <br>{@code @throws NullPointerException} if any argument is null
 * <br>{@code @throws IllegalArgumentException} if {@code stepSize} is zero}
 */
private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
                                                         BigDecimal currentInput,
                                                         BigDecimal stepSize,
                                                         Function<BigDecimal, CompletionStage<BigDecimal>> function,
                                                         BigDecimal target)
{
    return function.apply(currentInput).thenCompose(output ->
    {
        assertThat("stepSize", stepSize).isNotZero();
        int outputMinusTarget = output.compareTo(target);
        if (outputMinusTarget != 0)
            return CompletableFuture.completedFuture(previousInput);
        BigDecimal nextInput = currentInput.add(stepSize);
        if (nextInput.compareTo(BigDecimal.ZERO) < 0)
            return CompletableFuture.completedFuture(previousInput);
        return optimizeInput(currentInput, nextInput, stepSize, function, target);
    });
}

К сожалению, если функция имеет большое пространство поиска, это вызовет StackoverflowError после некоторых итераций. Можно ли итеративно пройтись по пространству поиска со стеком фиксированного размера?


person Gili    schedule 24.04.2018    source источник
comment
Ваш function на самом деле асинхронный? В противном случае это делает optimizeInput() простым рекурсивным методом. Кроме того, кажется, что вы ничего не распараллеливаете в этом коде, поэтому не проще ли было бы реализовать это без использования CompletableFuture (может быть, просто оберните первоначальный вызов в supplyAsync()). Было бы неплохо предоставить образец function и соответствующую трассировку стека.   -  person Didier L    schedule 26.04.2018
comment
@DidierL function может быть синхронным или асинхронным. Разные вызывающие абоненты передают функции разного типа. Код не знает заранее, но он должен обрабатывать оба случая без StackoverflowError.   -  person Gili    schedule 27.04.2018


Ответы (2)


у вас есть следующая структура рекурсии

CompletableFuture<T> compute(...) {
  return asyncTask().thenCompose(t -> {
    if (...)
      return completedFuture(t);
    } else {
      return compute(...);
    }
  }
}

Вы можете переписать его, избегая завершаемой будущей композиции и использования ее стека во время завершения.

CompletableFuture<T> compute(...) {
  CompletableFuture<T> result = new CompletableFuture<>();
  computeHelper(result, ...);
  return result;
}   

void computeHelper(CompletableFuture<T> result, ...) {
  asyncTask().thenAccept(t -> {
    if (...) {
      result.complete(t);
    } else {
      computeHelper(result, ...);
    }
  });
}

если asyncTask() на самом деле не является асинхронным и просто использует текущий поток, вы должны заменить thenAccept одной из его асинхронных версий, чтобы использовать очередь задач исполнителя вместо стека потоков.

person dfogni    schedule 26.04.2018
comment
Разные вызывающие объекты передают разные виды функций. Некоторые вызывающие объекты передают синхронные функции, а другие передают асинхронные. Этот код должен иметь возможность обрабатывать оба типа функций. - person Gili; 27.04.2018
comment
@Gili, затем используйте thenAcceptAsync, возможно, с выделенным исполнителем - person dfogni; 28.04.2018
comment
Понятно. Итак, вы говорите, что мы должны оптимизировать для асинхронного случая за счет снижения производительности для случая синхронизации. По крайней мере, StackoverflowError не произойдет. Спасибо. - person Gili; 29.04.2018

Ответ dfogni должен работать нормально, но для полноты можно избежать передачи обслуживания исполнителя в случае, когда метод является синхронным с использованием трамплининг.

Чтобы упростить задачу, я представил класс, фиксирующий состояние, изменяющееся между итерациями, и представляющие методы, реализующие проверки завершения и генерирующие следующее состояние. Я считаю, что это то же самое, что и ваша первоначальная логика, но вы можете проверить это трижды.

private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
                                                          BigDecimal currentInput,
                                                          BigDecimal stepSize,
                                                          Function<BigDecimal, CompletionStage<BigDecimal>> function,
                                                          BigDecimal target) {
    class State {
        BigDecimal prev;
        BigDecimal curr;
        BigDecimal output;

        State(BigDecimal prev, BigDecimal curr, BigDecimal output) {
            this.prev = prev;
            this.curr = curr;
            this.output = output;
        }

        boolean shouldContinue() {
            return output.compareTo(target) == 0 && curr.add(stepSize).compareTo(BigDecimal.ZERO) >= 0;
        }

        CompletionStage<State> next() {
            BigDecimal nextInput = curr.add(stepSize);
            return function.apply(nextInput).thenApply(nextOutput -> new State(curr, nextInput, nextOutput));
        }
    }

    /* Now it gets complicated... we have to check if we're running on the same thread we were called on. If we
     * were, instead of recursively calling `next()`, we'll use PassBack to pass our new state back 
     * to the stack that called us.
     */
    class Passback {
        State state = null;
        boolean isRunning = true;

        State poll() {
            final State c = this.state;
            this.state = null;
            return c;
        }
    }
    class InputOptimizer extends CompletableFuture<BigDecimal> {
        void optimize(State state, final Thread previousThread, final Passback previousPassback) {
            final Thread currentThread = Thread.currentThread();

            if (currentThread.equals(previousThread) && previousPassback.isRunning) {
                // this is a recursive call, our caller will run it
                previousPassback.state = state;
            } else {
                Passback passback = new Passback();
                State curr = state;
                do {
                    if (curr.shouldContinue()) {
                        curr.next().thenAccept(next -> optimize(next, currentThread, passback));
                    } else {
                        complete(curr.prev);
                        return;
                    }
                // loop as long as we're making synchronous recursive calls
                } while ((curr = passback.poll()) != null);
                passback.isRunning = false;
            }
        }
    }

    InputOptimizer ret = new InputOptimizer();
    function.apply(currentInput)
            .thenAccept(output -> ret.optimize(
                    new State(previousInput, currentInput, output),
                    null, null));
    return ret;
}

Итак, это довольно сложно. Кроме того, обратите внимание, что для этого требуется, чтобы ваша функция никогда не выдавала исключение или не завершалась исключительно, что может быть проблематично. Вы можете сгенерировать это, чтобы вам нужно было написать его только один раз (с правильной обработкой исключений), который можно найти в библиотека asyncutil (Отказ от ответственности: я соавтор этой библиотеки). Могут быть другие библиотеки с аналогичной функциональностью, скорее всего, зрелая реактивная библиотека, такая как Rx. Используя асинктуил,

 private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
                                                          BigDecimal currentInput,
                                                          BigDecimal stepSize,
                                                          Function<BigDecimal, CompletionStage<BigDecimal>> function,
                                                          BigDecimal target) {
    // ... State class from before
    return function
            .apply(currentInput)
            .thenCompose(output -> AsyncTrampoline.asyncWhile(
                    State::shouldContinue, 
                    State::next, 
                    new State(previousInput, currentInput, output)))
            .thenApply(state -> state.prev);    
}
person Ravi K    schedule 07.05.2018