Передать поток в s3.upload ()

В настоящее время я использую плагин node.js под названием s3-upload-stream для потоковой передачи очень больших файлов в Amazon S3. Он использует составной API и по большей части работает очень хорошо.

Однако этот модуль показывает свой возраст, и мне уже пришлось внести в него изменения (автор также отказался от него). Сегодня я столкнулся с другой проблемой с Amazon, и я действительно хотел бы воспользоваться рекомендацией автора и начать использовать официальный aws-sdk для выполнения своих загрузок.

НО.

Официальный SDK, похоже, не поддерживает подключение к s3.upload(). Природа s3.upload заключается в том, что вы должны передать читаемый поток в качестве аргумента конструктору S3.

У меня есть примерно 120+ модулей пользовательского кода, которые выполняют различную обработку файлов, и они не зависят от конечного пункта назначения своего вывода. Движок передает им записываемый по конвейеру выходной поток, и они передают его по конвейеру. Я не могу передать им AWS.S3 объект и попросить их вызвать upload() без добавления кода ко всем модулям. Причина, по которой я использовал s3-upload-stream, заключалась в том, что он поддерживал трубопровод.

Есть ли способ сделать aws-sdk s3.upload() чем-то, на что я могу направить поток?


person womp    schedule 20.05.2016    source источник


Ответы (11)


Оберните функцию S3 upload() потоком node.js stream.PassThrough().

Вот пример:

inputStream
  .pipe(uploadFromStream(s3));

function uploadFromStream(s3) {
  var pass = new stream.PassThrough();

  var params = {Bucket: BUCKET, Key: KEY, Body: pass};
  s3.upload(params, function(err, data) {
    console.log(err, data);
  });

  return pass;
}
person Casey Benko    schedule 21.05.2016
comment
Отлично, это решило мой очень уродливый хак = -) Вы можете объяснить, что на самом деле делает stream.PassThrough ()? - person mraxus; 22.10.2016
comment
Закроется ли ваш сквозной поток, когда вы это сделаете? У меня чертовски много времени, чтобы продвинуть закрытие в s3.upload, чтобы попасть в мой поток PassThrough. - person four43; 15.12.2016
comment
размер загружаемого файла 0 байт. Если я передаю одни и те же данные из исходного потока в файловую систему, все работает хорошо. Любая идея? - person Radar155; 31.05.2017
comment
@ Radar155 вы решили свою проблему? Я наблюдаю такое же поведение. - person lawrence; 06.09.2017
comment
@lawrence Я сделал. но прямо сейчас я не помню как :( Единственное, что я помню, это то, что проблема была связана с некоторыми параметрами, переданными методу загрузки. Возможно, проверьте имя сегмента и ключ. - person Radar155; 07.09.2017
comment
Сквозной поток принимает записанные в него байты и выводит их. Это позволяет вам возвращать доступный для записи поток, из которого aws-sdk будет читать, когда вы в него пишете. Я бы также вернул объект ответа из s3.upload (), потому что в противном случае вы не сможете гарантировать завершение загрузки. - person reconbot; 08.12.2017
comment
Я пробовал этот подход, мой загруженный файл также имеет размер 0 байт. Может ли кто-нибудь мне с этим помочь? - person gabo; 23.12.2018
comment
Я также получаю 0-байтовый файл на S3. - Это неполное решение? @CaseyBenko - person nilsw; 06.02.2019
comment
Я также видел 0 байтов, когда делал это через экспресс-маршрут. Я изменил тип содержимого на text / plain, и он отлично работает. - person Tsar Bomba; 21.08.2019
comment
Разве это не то же самое, что передача читаемого потока в Body, но с дополнительным кодом? AWS SDK по-прежнему будет вызывать read () в потоке PassThrough, поэтому нет настоящего соединения на всем пути к S3. Единственная разница в том, что посередине есть дополнительный поток. - person ShadowChaser; 02.04.2020
comment
откуда берутся параметры s3 внутри трубы и stream? - person Blackjack; 17.04.2020

Немного запоздалый ответ, надеюсь, это может помочь кому-то другому. Вы можете вернуть как записываемый поток, так и обещание, чтобы вы могли получить данные ответа после завершения загрузки.

const AWS = require('aws-sdk');
const stream = require('stream');

const uploadStream = ({ Bucket, Key }) => {
  const s3 = new AWS.S3();
  const pass = new stream.PassThrough();
  return {
    writeStream: pass,
    promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
  };
}

И вы можете использовать функцию следующим образом:

const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');

const pipeline = readStream.pipe(writeStream);

Теперь вы можете проверить обещание:

promise.then(() => {
  console.log('upload completed successfully');
}).catch((err) => {
  console.log('upload failed.', err.message);
});

Или используя async / await:

try {
    await promise;
    console.log('upload completed successfully');
} catch (error) {
    console.log('upload failed.', error.message);
}

Или, поскольку stream.pipe() возвращает stream.Writable, пункт назначения (переменная writeStream выше), учитывая цепочку каналов, мы также можем использовать его события:

 pipeline.on('close', () => {
   console.log('upload successful');
 });
 pipeline.on('error', (err) => {
   console.log('upload failed', err.message)
 });
person Ahmet Cetin    schedule 11.05.2018
comment
Выглядит отлично, но со своей стороны я получаю эту ошибку stackoverflow.com/questions/62330721/ - person Arco Voltaico; 11.06.2020
comment
только что ответил на ваш вопрос. Надеюсь, это поможет. - person Ahmet Cetin; 12.06.2020
comment
Я вызываю это в асинхронной функции, поэтому использую await Promise. Работает для меня, спасибо - это была такая огромная и неожиданная проблема для меня. - person Matthias Herrmann; 14.03.2021
comment
Сделал мой день. Благодарность:) - person Rohit Kumar; 05.06.2021

В принятом ответе функция завершается до завершения загрузки и, следовательно, неверна. Приведенный ниже код правильно передает из читаемого потока.

Загрузить ссылку

async function uploadReadableStream(stream) {
  const params = {Bucket: bucket, Key: key, Body: stream};
  return s3.upload(params).promise();
}

async function upload() {
  const readable = getSomeReadableStream();
  const results = await uploadReadableStream(readable);
  console.log('upload complete', results);
}

Вы также можете пойти дальше и вывести информацию о ходе выполнения, используя ManagedUpload как таковое:

const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
  console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});

Справочник по управляемой загрузке

Список доступных событий

person tsuz    schedule 13.11.2017
comment
aws-sdk теперь предлагает обещания, встроенные в 2.3.0+, поэтому вам больше не нужно их отменять. s3.upload (params) .promise (). then (data = ›data) .catch (error =› error); - person DBrown; 16.12.2017
comment
@DBrown Спасибо за указатель! Соответственно, я обновил ответ. - person tsuz; 17.12.2017
comment
@tsuz, пытаясь реализовать ваше решение, выдает ошибку: TypeError: dest.on is not a function, есть идеи, почему? - person FireBrand; 03.01.2018
comment
Что такое dest.on? Вы можете показать пример? @FireBrand - person tsuz; 04.01.2018
comment
@tsuz Я думаю, это была проблема с узлом, в итоге я использовал этот пакет: npmjs.com/package / streaming-s3, это дало мне то, что мне было нужно, спасибо. - person FireBrand; 10.01.2018
comment
Это говорит о том, что принятый ответ является неполным, но он не работает с подключением к s3.upload, как указано в обновленном сообщении @ Womp. Было бы очень полезно, если бы этот ответ был обновлен, чтобы получить вывод по конвейеру из чего-то еще! - person MattW; 04.05.2018

Ни один из ответов не помог мне, потому что я хотел:

  • Вставить s3.upload()
  • Перенаправить результат s3.upload() в другой поток

Принятый ответ не делает последнего. Остальные полагаются на обещание api, которое неудобно при работе с потоковыми конвейерами.

Это моя модификация принятого ответа.

const s3 = new S3();

function writeToS3({Key, Bucket}) {
  const Body = new stream.PassThrough();

  s3.upload({
    Body,
    Key,
    Bucket: process.env.adpBucket
  })
   .on('httpUploadProgress', progress => {
       console.log('progress', progress);
   })
   .send((err, data) => {
     if (err) {
       Body.destroy(err);
     } else {
       console.log(`File uploaded and available at ${data.Location}`);
       Body.destroy();
     }
  });

  return Body;
}

const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});

pipeline.on('close', () => {
  // upload finished, do something else
})
pipeline.on('error', () => {
  // upload wasn't successful. Handle it
})

person cortopy    schedule 29.07.2019
comment
Выглядит отлично, но со своей стороны я получаю эту ошибку stackoverflow.com/questions/62330721/… - person Arco Voltaico; 11.06.2020

Решение Type Script:
В этом примере используются:

import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";

И асинхронная функция:

public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { 

         const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
            const passT = new stream.PassThrough();
            return {
              writeStream: passT,
              promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
            };
          };
        const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
        fsExtra.createReadStream(filePath).pipe(writeStream);     //  NOTE: Addition You can compress to zip by  .pipe(zlib.createGzip()).pipe(writeStream)
        let output = true;
        await promise.catch((reason)=> { output = false; console.log(reason);});
        return output;
}

Вызовите этот метод где-нибудь, например:

let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
person dzole vladimirov    schedule 25.07.2018
comment
Привет @dzole vladimirov .... Это было слишком хорошо. Большое спасибо. С Уважением. Это помогло мне решить проблему загрузки файла в корзину s3. - person Sucheta Shrivastava; 11.04.2021

В наиболее распространенном ответе выше следует отметить следующее: вам нужно вернуть проход в функции, если вы используете канал, например,

fs.createReadStream(<filePath>).pipe(anyUploadFunction())

function anyUploadFunction () { 
 let pass = new stream.PassThrough();
 return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}

В противном случае он незаметно перейдет к следующему без выдачи ошибки или выдаст ошибку TypeError: dest.on is not a function в зависимости от того, как вы написали функцию.

person varun bhaya    schedule 01.10.2019

Для тех, кто жалуется, что когда они используют функцию загрузки s3 api и файл с нулевым байтом заканчивается на s3 (@ Radar155 и @gabo), у меня тоже была эта проблема.

Создайте второй поток PassThrough и просто перенаправьте все данные от первого ко второму и передайте ссылку на этот второй потоку s3. Вы можете сделать это двумя разными способами - возможно, грязный способ - прослушать событие «данные» в первом потоке и затем записать те же данные во второй поток - аналогично для события «конца» - просто вызовите конечная функция во втором потоке. Я понятия не имею, является ли это ошибкой в ​​aws api, версией узла или какой-либо другой проблемой, но для меня это сработало.

Вот как это может выглядеть:

var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();

var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
    destStream.write(chunk);
});

srcStream.on('end', function () {
    dataStream.end();
});
person Tim    schedule 11.01.2019
comment
Это действительно сработало и для меня. Функция загрузки S3 просто молча умирала всякий раз, когда использовалась составная загрузка, но при использовании вашего решения она работала нормально (!). Спасибо! :) - person jhdrn; 09.03.2019
comment
Можете рассказать, зачем нужен второй поток? - person noob7; 12.07.2019

Следуя другим ответам и используя последний AWS SDK для Node.js, существует гораздо более чистое и простое решение, поскольку функция s3 upload () принимает поток, используя синтаксис ожидания и обещание S3:

var model = await s3Client.upload({
    Bucket : bucket,
    Key : key,
    ContentType : yourContentType,
    Body : fs.createReadStream(path-to-file)
}).promise();
person emich    schedule 14.09.2020
comment
Это работает для конкретного варианта использования чтения очень большого файла, упомянутого автором, но другие ответы все еще действительны, если вы используете потоки вне контекста файла (например, пытаетесь записать поток курсора mongo в s3, где вы по-прежнему нужно использовать поток PassThrough + pipe) - person Ken Colton; 07.01.2021

Если это поможет кому-то, кому я смог успешно передать поток с клиента на s3:

https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a

Серверный код предполагает, что req является объектом потока, в моем случае он был отправлен от клиента с информацией о файле, установленной в заголовках.

const fileUploadStream = (req, res) => {
  //get "body" args from header
  const { id, fn } = JSON.parse(req.get('body'));
  const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
  const params = {
    Key,
    Bucket: bucketName, //set somewhere
    Body: req, //req is a stream
  };
  s3.upload(params, (err, data) => {
    if (err) {
      res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
    } else {
      res.send(Key);
    }
  });
};

Да, это нарушает конвенцию, но если вы посмотрите на суть, это намного чище, чем все, что я нашел с помощью multer, busboy и т. Д.

+1 за прагматизм и спасибо @SalehenRahman за его помощь.

person mattdlockyer    schedule 25.04.2017
comment
multer, busboy обрабатывают загрузку нескольких частей / данных формы. req как поток работает, когда клиент отправляет буфер как тело из XMLHttpRequest. - person André Werlang; 25.07.2017
comment
Чтобы уточнить, загрузка выполняется из бэкенда, а не с клиента, верно? - person numX; 13.08.2020
comment
Да, он передает поток на бэкэнд, но он исходит из внешнего интерфейса. - person mattdlockyer; 14.08.2020

Я использую KnexJS, и у меня возникла проблема с использованием их потокового API. Я наконец исправил это, надеюсь, следующее поможет кому-то.

const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();

knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());

const uploadResult = await s3
  .upload({
    Bucket: 'my-bucket',
    Key: 'stream-test.txt',
    Body: passThroughStream
  })
  .promise();
person TestWell    schedule 02.07.2019

Если вам известен размер потока, вы можете использовать minio-js для загрузки потока следующим образом:

  s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
    if (e) {
      return console.log(e)
    }
    console.log("Successfully uploaded the stream")
  })
person Krishna Srinivas    schedule 21.05.2016