Котлин - сопрограммы работают не так, как ожидалось

Этот вопрос связан с одним из моих предыдущих вопросов: Котлин - сопрограммы с циклами.

Итак, это моя текущая реализация:

fun propagate() = runBlocking {
    logger.info("Propagating objectives...")
    val variablesWithSetObjectives: List<ObjectivePropagationMapping> =
        variables.filter { it.variable.objective != Objective.NONE }
    variablesWithSetObjectives.forEach { variableWithSetObjective ->
        logger.debug("Propagating objective ${variableWithSetObjective.variable.objective} from variable ${variableWithSetObjective.variable.name}")
        val job: Job = launch {
            propagate(variableWithSetObjective, variableWithSetObjective.variable.objective, this, variableWithSetObjective)
        }
        job.join()
        traversedVariableNames.clear()
    }
    logger.info("Done")
}

private tailrec fun propagate(currentVariable: ObjectivePropagationMapping, objectiveToPropagate: Objective, coroutineScope: CoroutineScope, startFromVariable: ObjectivePropagationMapping? = null) {
    if (traversedVariableNames.contains(currentVariable.variable.name)) {
        logger.debug("Detected loopback condition, stopping propagation to prevent loop")
        return
    }
    traversedVariableNames.add(currentVariable.variable.name)
    val objectiveToPropagateNext: Objective =
        if (startFromVariable != currentVariable) {
            logger.debug("Propagating objective $objectiveToPropagate to variable ${currentVariable.variable.name}")
            computeNewObjectiveForVariable(currentVariable, objectiveToPropagate)
        }
        else startFromVariable.variable.objective
    logger.debug("Choosing variable to propagate to next")
    val variablesToPropagateToNext: List<ObjectivePropagationMapping> =
        causalLinks
            .filter { it.toVariable.name == currentVariable.variable.name }
            .map { causalLink -> variables.first { it.variable.name == causalLink.fromVariable.name } }
    if (variablesToPropagateToNext.isEmpty()) {
        logger.debug("Detected end of path, stopping propagation...")
        return
    }
    val variableToPropagateToNext: ObjectivePropagationMapping = variablesToPropagateToNext.random()
    logger.debug("Chose variable ${variableToPropagateToNext.variable.name} to propagate to next")
    if (variablesToPropagateToNext.size > 1) {
        logger.debug("Detected split condition")
        variablesToPropagateToNext.filter { it != variableToPropagateToNext }.forEach {
            logger.debug("Launching child thread for split variable ${it.variable.name}")
            coroutineScope.launch {
                propagate(it, objectiveToPropagateNext, this)
            }
        }
    }
    propagate(variableToPropagateToNext, objectiveToPropagateNext, coroutineScope)
}

В настоящее время я использую алгоритм со следующей топологией переменных (обратите внимание, что алгоритм следует стрелкам, идущим к переменной, но не стрелкам, выходящим из переменной):  Переменная топология

В настоящее время я получаю следующий результат отладочной печати: https://pastebin.com/ya2tmc6s.

Как видите, хотя я запускаю сопрограммы, они не начинают выполняться, пока основная рекурсивная функция распространения не завершит исследование полного пути.

Вместо этого я бы хотел, чтобы запущенные сопрограммы начали выполняться немедленно ...


person Jojo01    schedule 11.10.2019    source источник
comment
Я не понимаю на 100%, что функция пытается сделать, но ваши сопрограммы комментариев (...) не начнут выполняться, пока не завершится основная рекурсивная функция распространения (...) и строки кода: 7-10 выглядят странно. Вы launch сопрограмма и join сразу после нее - это не имеет значения. Может быть, вы захотите запустить с onEach( launch { } ), а потом (!) Присоединиться к .forEach { it.join() }   -  person Neo    schedule 11.10.2019


Ответы (1)


Если не указано иное, все сопрограммы, которые вы запускаете в runBlocking, будут выполняться в одном потоке.

Если вы хотите включить многопоточность, вы можете просто изменить это на runBlocking(Dispatchers.Default). Я просто предполагаю, что весь этот код является потокобезопасным.

Если вы действительно не хотите включать многопоточность, вам действительно не нужно заботиться о том, в каком порядке работают сопрограммы.

person Matt Timmermans    schedule 11.10.2019