Как выполнить команду, которая производит бесконечный вывод и немедленно возвращается

Когда я пишу следующий код (в аммоните, но я не думаю, что это имеет значение)

("tail -f toTail.txt" lineStream) foreach(println(_)), программа дает мне последнюю строку, как и предполагалось, но затем зависает, и даже если я напишу больше в файле, ничего не выйдет.

Как API поддерживает процесс с неограниченным выходом?

Пытаюсь написать val myStream = ("tail -f toTail.txt" lineStream_!) но все равно не возвращает запись

Вот что говорит scala doc:

lineStream: возвращается немедленно, как и run, а генерируемый вывод предоставляется через Stream[String]. Получение следующего элемента этого потока может блокироваться до тех пор, пока он не станет доступным.

Поэтому я не понимаю, почему он блокирует

Кстати, у меня точно такое же поведение с Ammonite API.

Если еще раз набрать %%("tail", "-f", "toTail.txt"), метод просто зависнет и не вернется сразу.


person MaatDeamon    schedule 03.09.2017    source источник
comment
Опишите, что вы имеете в виду под не работает правильно   -  person pedromss    schedule 03.09.2017
comment
Я ожидаю, что будет возвращен inputStream. но он не возвращает, он просто блокируется там. Возможно, я немного запутался в том, как работает класс потока. Предполагается, что данные должны быть конечными, а не неограниченными для возвращаемого объекта потока?   -  person MaatDeamon    schedule 03.09.2017
comment
Насколько я понимаю, поток - это компьютер лениво. так что это не должно быть проблемой docs.scala -lang.org/overviews/collections/   -  person MaatDeamon    schedule 03.09.2017
comment
Проблема может возникнуть из-за метода lineStream Scala Process Builder.   -  person MaatDeamon    schedule 03.09.2017
comment
Думаю, вопрос немного неясен (ничего собственно не зависает, и сразу ничего возвращаться не должно), но проблема есть. Чтобы уточнить: ожидается, что "tail -f foo.txt".lineStream.foreach(println) не вернется сразу же, он должен сидеть там и распечатывать новые строки по мере их добавления к foo.txt. Она делает первое (сидит там), но не второе — никаких дополнительных строк не печатается, так как они добавляются в файл после запуска команды.   -  person Dima    schedule 04.09.2017
comment
Чтобы было еще понятнее, можно разбить его на команды: val stream = "tail -f".lineStream — возвращает сразу. stream.foreach(println) — выводит конец файла, а затем блоки, но не выводит больше строк по мере их добавления.   -  person Dima    schedule 04.09.2017


Ответы (2)


С ProcessBuilder проблем нет (по крайней мере, тех, которые вытекают из вашего варианта использования). Из документации по ProcessBuilder:

Запуск процессов

Для выполнения всех внешних команд, связанных с ProcessBuilder, можно использовать одну из четырех групп методов. Каждый из этих методов имеет различные перегрузки и варианты для обеспечения дополнительного контроля над вводом-выводом. Эти методы:

  • run: наиболее общий метод, он немедленно возвращает scala.sys.process.Process, а внешняя команда выполняется одновременно.
  • !: блокируется до выхода всех внешних команд и возвращает код выхода последней в цепочке выполнения.
  • !!: блокируется до выхода всех внешних команд и возвращает строку со сгенерированным выводом.
  • lineStream: возвращается немедленно, как и run, а генерируемый вывод предоставляется через Stream[String]. Получение следующего элемента этого потока может быть заблокировано, пока он не станет доступным. Этот метод вызовет исключение, если код возврата отличен от нуля. Если это нежелательно, используйте функцию lineStream_! метод.

В документации четко указано, что lineStream может блокироваться до тех пор, пока не станет доступной следующая строка. Поскольку природа tail -f представляет собой бесконечный поток строк lineBreak, программа заблокируется в ожидании появления следующей строки.

Для следующего предположим, что у меня есть файл: /Users/user/tmp/sample.txt и его содержимое:

boom
bar
cat

Почему lineStream_! не ошибается

import scala.language.postfixOps
import scala.sys.process._

object ProcessBuilder extends App {

  val myStream: Stream[String] = ("tail /Users/user/tmp/sample.txt" lineStream_!)
  println("I'm after tail!")
  myStream.filter(_ != null).foreach(println)
  println("Finished")
  System.exit(0)

}

Выходы:

I'm after tail!
boom
bar
cat
Finished

Итак, вы видите, что lineStream_! немедленно вернулся. Потому что природа команды конечна.

Как немедленно вернуться из команды, которая производит бесконечный вывод:

Давайте попробуем это с tail -f. Вам нужно больше контроля над вашим процессом. Опять же, как говорится в документации:

Если требуется полный контроль над вводом и выводом, можно использовать scala.sys.process.ProcessIO с run.

Итак, для примера:

import java.io.{BufferedReader, InputStreamReader}

import scala.language.postfixOps
import scala.sys.process._

object ProcessBuilder extends App {

  var reader: BufferedReader = _
  try {

    var myStream: Stream[String] = Stream.empty
    val processIO = new ProcessIO(
      (os: java.io.OutputStream) => ??? /* Send things to the process here */,
      (in: java.io.InputStream) => {

        reader = new BufferedReader(new InputStreamReader(in))
        myStream = Stream.continually(reader.readLine()).takeWhile(_ != "ff")
      },
      (in: java.io.InputStream) => ???,
      true
    )
    "tail -f /Users/user/tmp/sample.txt".run(processIO)
    println("I'm after the tail command...")

    Thread.sleep(2000)
    println("Such computation performed while tail was active!")

    Thread.sleep(2000)
    println("Such computation performed while tail was active again!")

    println(
      s"Captured these lines while computing: ${myStream.print(System.lineSeparator())}")

    Thread.sleep(2000)
    println("Another computation!")

  } finally {
    Option(reader).foreach(_.close())
  }
  println("Finished")
  System.exit(0)

}

Выходы:

I'm after the tail command...
Such computation performed while tail was active!
Such computation performed while tail was active again!
boom
bar
cat

Он по-прежнему возвращается немедленно, и теперь он просто висит там, ожидая большего ввода. Если я делаю echo 'fff' >> sample.txt из каталога tmp, программа выводит:

Another computation!
Finished

Теперь у вас есть возможность выполнять любые вычисления, которые вы хотите, после выдачи команды tail -f и возможность завершить ее в зависимости от условия, которое вы передаете методу takeWhile (или другим методам, закрывающим входной поток).

Дополнительные сведения о ProcessIO см. в документации здесь.

person pedromss    schedule 04.09.2017
comment
Я думаю, вы отвечаете на неправильный вопрос: проблема оператора, похоже, заключается в том, что "tail -f foo.txt".lineStream.foreach(println) не печатает новые строки, которые добавляются в foo.txt после запуска команды. Это действительно вызывает недоумение... - person Dima; 04.09.2017
comment
Поэтому я не понимаю, почему он блокируется - я ответил, почему он блокируется. И предоставил альтернативы. Может быть, он сможет объяснить, когда увидит это :) - person pedromss; 04.09.2017
comment
@Dima "tail -f foo.txt".lineStream.foreach(println) печатает строки, которые отправляются в файл. :) - person pedromss; 04.09.2017
comment
Зависит от того, как вы их отправляете в файл (а также, возможно, от вашей ОС). Смотрите мой ответ... - person Dima; 04.09.2017
comment
Может быть. Я отправляю их с echo и оператором >>. Бег El capitan 10.11.6 - person pedromss; 04.09.2017
comment
да, это работает. но не в том случае, если вы просто откроете файл в редакторе, наберете что-нибудь в конце и сохраните. Это работает с -F, но не с -f - person Dima; 04.09.2017
comment
@pedromss Я благодарю вас за этот отличный ответ. Спасибо. У меня есть 1 уточнение, которое мне скорее понадобится. Поскольку природа tail -f представляет собой бесконечный поток строк lineBreak, программа заблокируется в ожидании появления следующей строки. Не сказано почему? Чем отличается реализация LineStream от вашей, я не понимаю, почему ваша возвращает, а linestream нет. Другими словами, что вы не делаете в своем коде, что позволяет вам не блокировать? - person MaatDeamon; 06.09.2017
comment
Мой не блокируется до бесконечности, потому что в потоке есть условие остановки, которое считывает вывод команды tail. - person pedromss; 06.09.2017
comment
@pedromss говорит, что я выполняю команду, которая возвращает 6 миллионов строк. Это конечно вообще не висит. Что произойдет, если я использую линейный поток. В память все загружено? В идеале я хотел бы потреблять эти 6 миллионов строк в потоковом режиме, не загружать их в память моей программы, а постоянно получать часть из потока, обрабатывать его, а затем переходить к следующим. Сможет ли лайнстрим справиться с этим? Или я должен использовать процесс ввода-вывода по этому вопросу? Кроме того, если я не знаю конечного персонажа, какие у меня есть варианты? - person MaatDeamon; 06.09.2017
comment
Я мог бы убедиться, что линейный поток действительно ожидает полного результата, прежде чем возвращать поток поверх него. Я столкнулся с нехваткой памяти при выполнении команды, которая возвращает огромное количество данных. - person MaatDeamon; 07.09.2017
comment
Я предполагаю, что меня интересует фактический поток ввода и вывода базового выполнения команды java sys. - person MaatDeamon; 07.09.2017
comment
@MaatDeamon параметр in используемого обратного вызова является входным потоком выполнения базовой команды. - person pedromss; 07.09.2017
comment
Я предполагаю, что если кто-то просто хочет связать результат одной команды в граф Akka-Stream Stream, используя параметр in, как вы указали, это нормально. Однако, если необходимо выполнить многие из этих вызовов, тогда возникают проблемы, то есть если ваш поток начинается с системного вызова, затем этапа, выполняющего некоторую операцию, а затем системного вызова в flatmapContact и т. д., в этом случае, модель ProcessIO не работает. Придется вернуться к классу java, потому что необходимо вернуть java.io.stream. Но это противоположность этой модели... Я правильно понял? - person MaatDeamon; 08.09.2017

Я думаю, это связано с тем, как вы добавляете данные в файл, а не с ProcessBuilder. Если вы используете его в редакторе, например, для добавления данных, он перезаписывает весь файл при каждом сохранении с другим индексным узлом, и tail этого не обнаруживает.

Попробуйте сделать tail -F вместо tail -f, должно сработать.

person Dima    schedule 04.09.2017