ZIO смотреть события файловой системы

помогите мне как организовать сканирование каталогов на ZIO. Это моя версия, но она не отслеживает все события создания файлов (пропускает некоторые события).

object Main extends App {

  val program = for {
    stream <- ZIO.succeed(waitEvents)
    _ <- stream.run(ZSink.foreach(k => putStrLn(k.map(e => (e.kind(), e.context())).mkString("\n"))))
  } yield ()

  val managedWatchService = ZManaged.make {
    for {
      watchService <- FileSystem.default.newWatchService
      path = Path("c:/temp")
      _ <- path.register(watchService,
        StandardWatchEventKinds.ENTRY_CREATE,
        StandardWatchEventKinds.ENTRY_DELETE
      )
    } yield watchService
  }(_.close.orDie)

  val lookKey = ZManaged.make {
    managedWatchService.use(watchService => watchService.take)
  }(_.reset)

  val waitEvents = ZStream.fromEffect {
    lookKey.use(key => key.pollEvents)
  }.repeat(Schedule.forever)

  override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] =
    program
      .provideLayer(Console.live ++ Blocking.live ++ Clock.live)
      .exitCode
  
}

Спасибо за ваш совет.


person grednoud    schedule 14.12.2020    source источник


Ответы (1)


Вы заставляете свой WatchService выключаться и воссоздавать заново каждый раз, когда запрашиваете события. Поскольку это, вероятно, связано с некоторыми системными дескрипторами, это, вероятно, будет довольно медленным, поэтому вы, вероятно, пропустите файловые события, которые происходят между ними. Скорее всего, вы захотите произвести WatchService один раз, а затем повторно его опрашивать. Вместо этого я бы предложил что-то вроде этого:

object Main extends App {        
  val managedWatchService = ZManaged.make {
    for {
      watchService <- FileSystem.default.newWatchService
      path = Path("c:/temp")
      _ <- path.register(watchService,
        StandardWatchEventKinds.ENTRY_CREATE,
        StandardWatchEventKinds.ENTRY_DELETE
      )
    } yield watchService
  }(_.close.orDie)
      
  // Convert ZManaged[R, E, ZStream[R, E, A]] into ZStream[R, E, A]
  val waitEvents = ZStream.unwrapManaged(
    managedWatchService.mapM(_.take).map { key =>
      // Use simple effect composition instead of a managed for readability.
      ZStream.repeatEffect(key.pollEvents <* key.reset)
       // Optional: Flatten the `List` of values that is returned
       .flattenIterables
    }
  )

  val program = waitEvents
    .map(e => (e.kind(), e.context()).toString)
    .foreach(putStrLn).unit

  override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] =
    program
      .provideLayer(Console.live ++ Blocking.live ++ Clock.live)
      .exitCode
  
}

Также в качестве примечания, при использовании ZManaged вы, вероятно, не захотите делать

ZManaged.make(otherManaged.use(doSomething))(tearDown)

потому что вы приведете к нарушению порядка выполнения финализаторов. ZManaged уже может обрабатывать порядок разборки только через обычную flatMap композицию.

otherManaged.flatMap { other => ZManaged.make(doSomething(other))(tearDown) }
person paulpdaniels    schedule 14.12.2020