Play 2.x: реактивная загрузка файлов с помощью Iteratees

Я начну с вопроса: Как использовать Iteratee Scala API для загрузки файла в облачное хранилище (в моем случае Azure Blob Storage, но я не думаю, что сейчас это самое важное)

Задний план:

Мне нужно разбить входные данные на блоки размером около 1 МБ для хранения больших медиафайлов (300 МБ+) в виде BlockBlobs Azure. К сожалению, мои знания Scala все еще плохи (мой проект основан на Java, и единственным использованием Scala в нем будет контроллер загрузки).

Я пробовал с этим кодом: Почему возникает ошибка вызова или выполняется в Iteratee BodyParser, запрос зависает в Play Framework 2.0? (как Input Iteratee) - это работает довольно хорошо, но каждый Element, который я мог использовать, имеет размер 8192 байта, поэтому он слишком мал для отправки нескольких сотен мегабайтных файлов в облако.

Должен сказать, что это совершенно новый для меня подход, и, скорее всего, я что-то неправильно понял (не хочу сказать, что я все неправильно понял ;> )

Я буду признателен за любую подсказку или ссылку, которая поможет мне в этой теме. Если есть какой-либо образец подобного использования, это был бы лучший вариант для меня, чтобы получить представление.


person biesior    schedule 11.08.2012    source источник
comment
Вы ищете перераспределение входных данных на более крупные куски?   -  person Sadache    schedule 15.08.2012


Ответы (4)


По сути, сначала вам нужно перераспределить ввод как более крупные куски, 1024 * 1024 байта.

Сначала давайте создадим Iteratee, который будет потреблять до 1 млн байт (хорошо, чтобы последний фрагмент был меньше)

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()

Используя это, мы можем создать Enumeratee (адаптер), который будет перегруппировывать куски, используя API под названием grouped:

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

Здесь grouped использует Iteratee, чтобы определить, сколько помещать в каждый фрагмент. Для этого он использует наш потребляемый AMB. Это означает, что результатом является Enumeratee, который повторно разбивает входные данные на Array[Byte] размером 1 МБ.

Теперь нам нужно написать BodyParser, который будет использовать метод Iteratee.foldM для отправки каждой порции байтов:

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  }

foldM передает состояние и использует его в своей переданной функции (S,Input[Array[Byte]]) => Future[S] для возврата нового будущего состояния. foldM не будет вызывать функцию снова до тех пор, пока Future не будет завершено и не появится доступный кусок ввода.

И парсер тела будет перераспределять входные данные и помещать их в хранилище:

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

Возврат права указывает, что вы возвращаете тело к концу синтаксического анализа тела (который здесь является обработчиком).

person Community    schedule 15.08.2012
comment
Хорошее объяснение. Два вопроса: (1) Что делает Iteratee.foldM? Не могу найти это в документации API здесь: playframework.org/documentation/api/2.0.2/scala/ (2) Зачем нужен map(Right(_))? Было бы здорово, если бы вы добавили что-то об этом в свой пост. - person lost; 15.08.2012
comment
Спасибо, Садек, мне нужно время, чтобы проверить это. - person biesior; 16.08.2012
comment
@Sadache: кажется, что Iteratee.foldM[E,A] помещается в master, но не в 2.0.3, это правда? Мы собираемся использовать стабильную версию для этого производства. Вы планируете новый релиз в ближайшее время? - person biesior; 16.08.2012
comment
Да, но вы также можете пока скопировать код метода foldM. - person Sadache; 17.08.2012
comment
К вашему сведению, этот ответ обсуждается по адресу stackoverflow.com/questions/12609451/. - person Rich Dougherty; 14.11.2013

Если ваша цель — стримить на S3, вот помощник, который я реализовал и протестировал:

def uploadStream(bucket: String, key: String, enum: Enumerator[Array[Byte]])
                (implicit ec: ExecutionContext): Future[CompleteMultipartUploadResult] = {
  import scala.collection.JavaConversions._

  val initRequest = new InitiateMultipartUploadRequest(bucket, key)
  val initResponse = s3.initiateMultipartUpload(initRequest)
  val uploadId = initResponse.getUploadId

  val rechunker: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped {
    Traversable.takeUpTo[Array[Byte]](5 * 1024 * 1024) &>> Iteratee.consume()
  }

  val uploader = Iteratee.foldM[Array[Byte], Seq[PartETag]](Seq.empty) { case (etags, bytes) =>
    val uploadRequest = new UploadPartRequest()
      .withBucketName(bucket)
      .withKey(key)
      .withPartNumber(etags.length + 1)
      .withUploadId(uploadId)
      .withInputStream(new ByteArrayInputStream(bytes))
      .withPartSize(bytes.length)

    val etag = Future { s3.uploadPart(uploadRequest).getPartETag }
    etag.map(etags :+ _)
  }

  val futETags = enum &> rechunker |>>> uploader

  futETags.map { etags =>
    val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toBuffer[PartETag])
    s3.completeMultipartUpload(compRequest)
  }.recoverWith { case e: Exception =>
    s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, uploadId))
    Future.failed(e)
  }

}
person atamborrino    schedule 19.09.2014
comment
как вы используете этот метод с контроллером? - person Pradeep; 24.10.2014

добавьте следующее в ваш файл конфигурации

play.http.parser.maxMemoryBuffer=256K

person dibble    schedule 11.01.2016

Для тех, кто также пытается найти решение этой проблемы с потоковой передачей, вместо того, чтобы писать полностью новый BodyParser, вы также можете использовать то, что уже реализовано в parse.multipartFormData. Вы можете реализовать что-то вроде ниже, чтобы перезаписать обработчик по умолчанию handleFilePartAsTemporaryFile.

def handleFilePartAsS3FileUpload: PartHandler[FilePart[String]] = {
  handleFilePart {
    case FileInfo(partName, filename, contentType) =>

      (rechunkAdapter &>> writeToS3).map {
        _ =>
          val compRequest = new CompleteMultipartUploadRequest(...)
          amazonS3Client.completeMultipartUpload(compRequest)
          ...
      }
  }
}

def multipartFormDataS3: BodyParser[MultipartFormData[String]] = multipartFormData(handleFilePartAsS3FileUpload)

Я могу сделать эту работу, но я все еще не уверен, передается ли весь процесс загрузки. Я попробовал несколько больших файлов, кажется, что загрузка S3 начинается только тогда, когда весь файл был отправлен со стороны клиента.

Я посмотрел на приведенную выше реализацию парсера и думаю, что все подключено с помощью Iteratee, поэтому файл должен быть потоковым. Если у кого-то есть понимание этого, это будет очень полезно.

person Sheng    schedule 13.08.2014