Передача данных в доступный для записи поток, который еще не готов к приему данных

Есть ли способ подключить читаемый поток к записываемому потоку в Node.js, где записываемый еще не готов к приему данных? Другими словами, я хотел бы соединить доступное для чтения и доступное для записи, но я хочу инициализировать доступное для записи, включая определение метода записи, на более позднем этапе программы. Возможно, мы должны реализовать метод записи, но есть ли способ приостановить поток, доступный для записи, аналогично тому, как вы можете приостановить поток, доступный для чтения? Или, возможно, мы можем использовать промежуточный поток сквозного/преобразования и буферизовать данные там, прежде чем передавать данные на запись!

Например, обычно мы делаем:

readable.pipe(transform).pipe(writable);

но я хочу сделать что-то вроде:

const tstrm = readable.pipe(transform);

doSomethingAsync().then(function(){

      tstrm.pipe(writable);

});

просто интересно, возможно ли это и как это сделать правильно, пока не могу понять и то, и другое.

Я предполагаю, что я хочу буферизовать данные в промежуточном потоке преобразования, прежде чем он будет подключен/передан в доступный для записи поток, а затем, после его подключения, сначала передать буферизованные данные перед любыми новыми данными. Вроде разумная вещь, не могу найти никакой информации по этому поводу.


person Alexander Mills    schedule 12.04.2016    source источник
comment
похоже, что принятый здесь ответ близок к тому, что я ищу > stackoverflow.com/questions/20317759/   -  person Alexander Mills    schedule 13.04.2016


Ответы (1)


Обратите внимание: здесь я использую интервал, чтобы имитировать способность писателя читать или нет. Вы можете сделать это так, как хотите, т.е. если писатель вернет false, вы обновите состояние, чтобы начать буферизацию и т. д. Я думаю, что последняя строка — это то, что вы хотите, т.е.

r.pipe(b).pipe(w);

Это читается следующим образом

readStrem.pipe(transformBbuffer).pipe(writeStream);

В примере кода есть некоторые изменения, которые мы можем внести для буферизации всех данных. Опишу после кода. Все, что вам нужно знать о потоках, и многое другое находится в документации, я думаю, что они могли бы сделать с более полными примерами, но они довольно хороши и так...

https://nodejs.org/api/stream.html#stream_class_stream_transform_1

Это код.

var fs     = require('fs');
var stream = require('stream')
const util = require('util');
//const StringDecoder = require('string_decoder').StringDecoder;
const Transform = require('stream').Transform;
var check_buff  = 0;
var DRAIN_ME    = 0;

var r = fs.createReadStream('file1.txt').setEncoding('utf8');
var w = fs.createWriteStream('file2.txt');

var BufferStream = function () {
  stream.Transform.apply(this, arguments);
  this.buffer = []; 
};

util.inherits(BufferStream, stream.Transform);

var intId;
intId = setInterval(function(){
  if(check_buff % 3 == 0) {
    DRAIN_ME = 1;
    return;
  }
  DRAIN_ME = 0;
},10);  

BufferStream.prototype._transform = function (chunk, encoding, done) {
  this.buffer.push(String(chunk));
  while(DRAIN_ME > 0 && this.buffer.length > 0) {
    this.push(this.buffer.shift());
  }
  console.log(chunk.length);
  console.log(this.buffer.length);
  done();
};

var b = new BufferStream();
b.on('end', function(chunk) {
  clearInterval(intId);
});
r.pipe(b).pipe(w);

Я ищу канонический способ реализовать трансформирующий/сквозной поток, который буферизует все данные до тех пор, пока на нем не будет вызван канал.

Внесите следующие изменения

BufferStream.prototype._transform = function (chunk, encoding, done) {
  this.buffer.push(String(chunk));

  console.log(chunk.length);
  console.log(this.buffer.length);
  done();
};
......
BufferStream.prototype._flush = function (cb) {
  var len = this.buffer.length;
  for (var i = 0; i < len; i++) {
    this.push(this.buffer.shift());
  };
  cb();
};

Вы также можете приостановить доступный для чтения поток, который фактически приостановит доступный для записи поток, потому что он перестанет получать данные, т.е....

Чтобы проверить это, создайте довольно большой файл на диске, например, 100 МБ или более, и запустите это...

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
  var ready = 0;
  readableStream.pause();
  setInterval(function(){
    if(ready == 0) {
      //console.log('pausing');
      readableStream.pause();
      ready = 1;
    }   
    else {
      //console.log('resuming');
      readableStream.resume();
      ready = 0;
    }   
  },100);  
  writableStream.write(chunk);
});

Причина немедленной паузы в том, что к моменту запуска интервала 10ms файл уже мог быть записан. Есть вариации на этот счет, т.

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');

var ready = 0;
setInterval(function(){
  if(ready == 0) {
    //console.log('pausing');
    readableStream.pause();
    ready = 1;
  }
  else {
    //console.log('resuming');
    readableStream.resume();
    ready = 0;
  }
},100);  

readableStream.on('data', function(chunk) {
  writableStream.write(chunk);
  readableStream.pause();
});
person Harry    schedule 20.04.2016
comment
спасибо, да, я могу приостановить доступ к чтению, это не так сложно, но как насчет использования потока сквозного/преобразования для буферизации данных, пока мы ждем, когда доступ для записи станет готов к приему данных? - person Alexander Mills; 21.04.2016