У меня есть приложение Node, которое собирает голоса и сохраняет их в Cassandra. Голоса хранятся в виде зашифрованных строк в кодировке base64. У API есть конечная точка с именем /export
, которая должна получить все эти строки голосов (возможно, > 1 миллиона), преобразовать их в двоичные файлы и добавить их одну за другой в файл voices.egd. Затем этот файл следует заархивировать и отправить клиенту. Моя идея состоит в том, чтобы передавать строки из Cassandra, преобразовывая каждую строку голосования в двоичный файл и записывая в WriteStream
. Я хочу обернуть эту функциональность в Promise для удобства использования. У меня есть следующее:
streamVotesToFile(query, validVotesFileBasename) {
return new Promise((resolve, reject) => {
const writeStream = fs.createWriteStream(`${validVotesFileBasename}.egd`);
writeStream.on('error', (err) => {
logger.error(`Writestream ${validVotesFileBasename}.egd error`);
reject(err);
});
writeStream.on('drain', () => {
logger.info(`Writestream ${validVotesFileBasename}.egd error`);
})
db.client.stream(query)
.on('readable', function() {
let row = this.read();
while (row) {
const envelope = new Buffer(row.vote, 'base64');
if(!writeStream.write(envelope + '\n')) {
logger.error(`Couldn't write vote`);
}
row = this.read()
}
})
.on('end', () => { // No more rows from Cassandra
writeStream.end();
writeStream.on('finish', () => {
logger.info(`Stream done writing`);
resolve();
});
})
.on('error', (err) => { // err is a response error from Cassandra
reject(err);
});
});
}
Когда я запускаю это, он добавляет все голоса в файл и загружается нормально. Но есть куча проблем/вопросов:
Если я сделаю запрос к конечной точке
/export
и эта функция запустится, в то время как она работает, все остальные запросы к приложению будут очень медленными или просто не завершатся до выполнения запроса на экспорт. Я предполагаю, что цикл событий перегружен всеми этими событиями из потока Cassandra (тысячи в секунду)?Кажется, что все голоса записываются в файл нормально, но я получаю
false
почти за каждый вызовwriteStream.write()
и вижу соответствующее зарегистрированное сообщение (см. Код)?Я понимаю, что мне нужно учитывать обратное давление и событие «слива» для WritableStream, поэтому в идеале я бы использовал
pipe()
и передал голоса в файл, потому что в нем встроена поддержка обратного давления (правильно?), но поскольку мне нужно обрабатывать каждую строку ( преобразовать в двоичный и, возможно, добавить другие данные из других полей строки в будущем), как бы я сделал это с каналом?