Обратите внимание: здесь я использую интервал, чтобы имитировать способность писателя читать или нет. Вы можете сделать это так, как хотите, т.е. если писатель вернет false, вы обновите состояние, чтобы начать буферизацию и т. д. Я думаю, что последняя строка — это то, что вы хотите, т.е.
r.pipe(b).pipe(w);
Это читается следующим образом
readStrem.pipe(transformBbuffer).pipe(writeStream);
В примере кода есть некоторые изменения, которые мы можем внести для буферизации всех данных. Опишу после кода. Все, что вам нужно знать о потоках, и многое другое находится в документации, я думаю, что они могли бы сделать с более полными примерами, но они довольно хороши и так...
https://nodejs.org/api/stream.html#stream_class_stream_transform_1
Это код.
var fs = require('fs');
var stream = require('stream')
const util = require('util');
//const StringDecoder = require('string_decoder').StringDecoder;
const Transform = require('stream').Transform;
var check_buff = 0;
var DRAIN_ME = 0;
var r = fs.createReadStream('file1.txt').setEncoding('utf8');
var w = fs.createWriteStream('file2.txt');
var BufferStream = function () {
stream.Transform.apply(this, arguments);
this.buffer = [];
};
util.inherits(BufferStream, stream.Transform);
var intId;
intId = setInterval(function(){
if(check_buff % 3 == 0) {
DRAIN_ME = 1;
return;
}
DRAIN_ME = 0;
},10);
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));
while(DRAIN_ME > 0 && this.buffer.length > 0) {
this.push(this.buffer.shift());
}
console.log(chunk.length);
console.log(this.buffer.length);
done();
};
var b = new BufferStream();
b.on('end', function(chunk) {
clearInterval(intId);
});
r.pipe(b).pipe(w);
Я ищу канонический способ реализовать трансформирующий/сквозной поток, который буферизует все данные до тех пор, пока на нем не будет вызван канал.
Внесите следующие изменения
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.buffer.push(String(chunk));
console.log(chunk.length);
console.log(this.buffer.length);
done();
};
......
BufferStream.prototype._flush = function (cb) {
var len = this.buffer.length;
for (var i = 0; i < len; i++) {
this.push(this.buffer.shift());
};
cb();
};
Вы также можете приостановить доступный для чтения поток, который фактически приостановит доступный для записи поток, потому что он перестанет получать данные, т.е....
Чтобы проверить это, создайте довольно большой файл на диске, например, 100 МБ или более, и запустите это...
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
readableStream.on('data', function(chunk) {
var ready = 0;
readableStream.pause();
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);
writableStream.write(chunk);
});
Причина немедленной паузы в том, что к моменту запуска интервала 10ms
файл уже мог быть записан. Есть вариации на этот счет, т.
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
var ready = 0;
setInterval(function(){
if(ready == 0) {
//console.log('pausing');
readableStream.pause();
ready = 1;
}
else {
//console.log('resuming');
readableStream.resume();
ready = 0;
}
},100);
readableStream.on('data', function(chunk) {
writableStream.write(chunk);
readableStream.pause();
});
person
Harry
schedule
20.04.2016