Потоковая передача данных из Cassandra в файл с учетом противодавления

У меня есть приложение Node, которое собирает голоса и сохраняет их в Cassandra. Голоса хранятся в виде зашифрованных строк в кодировке base64. У API есть конечная точка с именем /export, которая должна получить все эти строки голосов (возможно, > 1 миллиона), преобразовать их в двоичные файлы и добавить их одну за другой в файл voices.egd. Затем этот файл следует заархивировать и отправить клиенту. Моя идея состоит в том, чтобы передавать строки из Cassandra, преобразовывая каждую строку голосования в двоичный файл и записывая в WriteStream. Я хочу обернуть эту функциональность в Promise для удобства использования. У меня есть следующее:

streamVotesToFile(query, validVotesFileBasename) {
  return new Promise((resolve, reject) => {
    const writeStream = fs.createWriteStream(`${validVotesFileBasename}.egd`);

    writeStream.on('error', (err) => {
      logger.error(`Writestream ${validVotesFileBasename}.egd error`);
      reject(err);
    });

    writeStream.on('drain', () => {
      logger.info(`Writestream ${validVotesFileBasename}.egd error`);
    })

    db.client.stream(query)
    .on('readable', function() {
      let row = this.read();
      while (row) {
        const envelope = new Buffer(row.vote, 'base64');
        if(!writeStream.write(envelope + '\n')) {
          logger.error(`Couldn't write vote`);
        }
        row = this.read()
      }
    })
    .on('end', () => { // No more rows from Cassandra
      writeStream.end();
      writeStream.on('finish', () => {
        logger.info(`Stream done writing`);
        resolve();
      });
    })
    .on('error', (err) => { // err is a response error from Cassandra
      reject(err);
    });
  });
}

Когда я запускаю это, он добавляет все голоса в файл и загружается нормально. Но есть куча проблем/вопросов:

  1. Если я сделаю запрос к конечной точке /export и эта функция запустится, в то время как она работает, все остальные запросы к приложению будут очень медленными или просто не завершатся до выполнения запроса на экспорт. Я предполагаю, что цикл событий перегружен всеми этими событиями из потока Cassandra (тысячи в секунду)?

  2. Кажется, что все голоса записываются в файл нормально, но я получаю false почти за каждый вызов writeStream.write() и вижу соответствующее зарегистрированное сообщение (см. Код)?

  3. Я понимаю, что мне нужно учитывать обратное давление и событие «слива» для WritableStream, поэтому в идеале я бы использовал pipe() и передал голоса в файл, потому что в нем встроена поддержка обратного давления (правильно?), но поскольку мне нужно обрабатывать каждую строку ( преобразовать в двоичный и, возможно, добавить другие данные из других полей строки в будущем), как бы я сделал это с каналом?


person gcosta    schedule 03.03.2017    source источник


Ответы (1)


Это идеальный вариант использования TransformStream:

const myTransform = new Transform({
  readableObjectMode: true,
  transform(row, encoding, callback) {
    // Transform the row into something else
    const item = new Buffer(row['vote'], 'base64');
    callback(null, item);
  }
});

client.stream(query, params, { prepare: true })
  .pipe(myTransform)
  .pipe(fileStream);

Дополнительную информацию о реализации TransformStream см. в Документации по Node.js API.

person jorgebg    schedule 03.03.2017
comment
Имеет смысл. Но если я использую ваш код ^^, я получаю сообщение об ошибке: TypeError: Invalid non-string/buffer chunk at validChunk (_stream_writable.js:216:10) at Transform.Writable.write (_stream_writable.js:245:12) at ResultStream .ondata (_stream_readable.js:555:20) в emitOne (events.js:96:13) в ResultStream.emit (events.js:188:7) в ResultStream.Readable.read (_stream_readable.js:381:10) - person gcosta; 03.03.2017
comment
Ах, myTransform должен быть потоком Transform в объектном режиме, поскольку клиентский поток возвращает объект Readable Streams2 в объектном режиме. (установите для objectMode значение true). Спасибо! - person gcosta; 03.03.2017