Асинхронное построение производителя

У меня есть случай, когда я хочу построить ReceiveChannel, используя produce, асинхронно, но он зависает. Вот упрощенный пример:

runBlocking {
    val deferredChannel = async {
        produce<String> { send("foo") }
    }

    val channel = deferredChannel.await()

    println("Got channel")

    val value = channel.receive()

    println("Got value $value")
}

Ни один println не пострадал. Вероятно, что здесь происходит какая-то тупиковая ситуация с сопрограммами, но я не понимаю, где и как.

Как я могу создать ReceiveChannel асинхронно?

Изменить: это сработает, если я изменю produce на produce(capacity = 1), но почему? Разве await() не должна добиться успеха, по крайней мере, независимо от возможностей производителя? А что, если я хочу, чтобы емкость была равна 0?


person Max    schedule 03.06.2019    source источник
comment
Изменение async на async(context = newSingleThreadContext("producer")) не оказывает никакого влияния.   -  person Max    schedule 03.06.2019
comment
It works if I change produce to produce(capacity = 1) - только потому, что вы производите всего один элемент. В производственном случае он будет продолжать производство до тех пор, пока не заполнит буфер.   -  person Marko Topolnik    schedule 04.06.2019
comment
Если бы я ожидал, что async { будет вести себя как thread {, не имеет значения, что я помещаю в блок - он будет выполняться независимо. Но здесь этого не происходит из-за некоторого несоответствия в моем понимании.   -  person Max    schedule 07.06.2019
comment
Он ведет себя как thread {, пока вы не await() на результате, который не имеет эквивалента в thread {, но сравним с executor.submit(task).get(), который блокирует таким же образом, потому что вы создаете тупик, когда отправленная задача ожидает, когда вы примете свой элемент, но вы не принимайте его, пока он не будет завершен.   -  person Marko Topolnik    schedule 07.06.2019


Ответы (1)


Он работает, если я изменяю производство на производство (мощность = 1), но почему? Разве await () не должна завершиться успешно, по крайней мере, независимо от возможностей производителя?

Проверка документации на вызываемый вами метод produce() и, в частности, docs о параметре емкости и Channel у нас есть (выделено мной):

Если емкость равна 0 - создается RendezvousChannel. На этом канале вообще нет буфера. Элемент передается от отправителя к получателю только тогда, когда вызовы отправки и получения встречаются вовремя (рандеву), поэтому отправка приостанавливается до тех пор, пока другая сопрограмма не вызовет получение, а получение приостанавливается до тех пор, пока не вызовет другую сопрограмму. Отправить.

Это могло быть причиной того, что он завис. Вы вызываете send в своем async потоке, а затем await для него ... однако, как говорится в документации, никакая другая подпрограмма еще не вызвала receive ... поэтому он будет приостановлен до тех пор, пока это не произойдет, и в этом случае он зависнет.

Проверяя ту же ссылку на Channel, мы также видим, почему присвоение ей числа больше 0 решает эту проблему (выделено мной):

Если емкость положительна, но меньше НЕОГРАНИЧЕННОЙ - создается канал на основе массива с заданной емкостью. Этот канал имеет буфер массива фиксированной емкости. Отправитель приостанавливает работу только при заполнении буфера, а приемник приостанавливает работу только при пустом буфере.

person DarkCygnus    schedule 03.06.2019