// Потоки предназначены для обработки данных в виде строк или двоичных данных в виде буферов, но также можно обрабатывать и данные в виде объектов
// Работа streams основана на работе EventEmitter.
// При создании события on('data') поток чтения работает в автоматическом режиме
// При создании события on('readable') поток чтения работает в пошаговом режиме и требует ручного вызова метода read() для чтения данных
// Поток чтения в режиме 'readable' изначально поставлен на паузу и работает в пошаговом режиме, но его можно переключить в автоматический режим
// Для этого необъодимо добавить обработчик события on('data') и снять поток чтения с режима паузы с помощью метода resume() или просто использовать метод pipe()
// Для обратного переключения из автоматического режима в пошаговый необходимо вызвать метод pause(), удалить обработчик события on('data') или просто использовать метод unpipe()
// Для дуплексных потоков необходимо определить и метод _read и метод _write
// События клиента и сервера это тоже streams.
// Используются 4 основных события:
// on('data') или on('readable') - когда идет поток чтения данных - 'data' - считывает данные в автоматическом режиме, 'readable' считывает данные в ручном пошаговом режиме
// on('end') - когда поток чтения данных завершен
// on('error') - если во время чтения или записи данных происходит ошибка
// on('finish') - когда завершен поток записи данных, вызывается после вызова метода writeStream.end()
// И дополнительное событие
// on('drain') - когда данные записаны из буфера чтения
// on('close') - кода поток завершен
// Методы:
// readStream.push('data') - для помещения данных в буфер потока чтения
// readStream.unshift() - заталкивает (возвращает) часть данных обратно во внутренний буфер, если они не были приняты, например, потоком чтения, и потому могут быть переданы в другой поток
// readStream.read(10) - для считывания данных из потока чтения только в пошаговом режиме, когда поток чтения поставлен на паузу
// readStream.read(0) - всегда возвращает null и используется только для обновления (refresh) потока чтения данных
// writeStream.write('data') - для записи данных в потоке записи,возвращает true, если внутренний буфер еще не переполнен и false, если внутренний буфер уже переполненн данными для записи
// writeStream.end() - для завершения записи данных в потоке записи
// readStream.pause() - для остановки чтения данных из потока чтения и перехода в пошаговый режим чтения данных, если мы находимся в режиме чтения с помощью событий
// readStream.resume() - для запуска потока чтения с помощью событий, если мы находимся в пошаговом ражиме чтения данных
// readStream.isPaused() - проверяет поставлен ли поток чтения на паузу, используется в основе метода pipe() и в других случаях обычно не используется
// readStream.pipe(writeStream) - осуществляет передачу данных из потока в поток без прерывания, возвращает последний переданный ей поток, что позволяет создавать цепочки потоков
// readStream.unpipe(writeStream) - останавливает передачу данных из потка чтения в поток записи
// writeStream.cork() - записывает все данные для записи из внутреннего буфера в память
// writeStream.uncork() - вынимает все данные для записи из внутреннего буфера памяти
// writeStream.setDefaultEncoding() - устанаdливает кодировку по умолчанию для трансформации частей с буферами в строки
// readStream.setEncoding() - устанавливает кодировку для данных потока чтения
// readStream.destroy() - уничтожает поток чтения и вызывает событие с передачей ему сообщения об ошибке. При использовании необходимо внутри класса создать метод _destroy()
// writeStream.destroy() - уничтожает поток записи и вызывает событие с передачей ему сообщения об ошибке. При использовании необходимо внутри класса создать метод _destroy()
// transformStream.destroy() - уничтожает поток трансформации и вызывает событие с передачей ему сообщения об ошибке. При использовании необходимо внутри класса создать метод _destroy()
// Для определения момента завершения чтения и записи данных можно указать в pipe опцию {end: false} и задать свой слушатель события on('end')
// readStream.pipe(writeStream, {end: false});
// readStream.on('end', function () {
// writeStream.end('Goodbye');
// });
const stream = require('stream');
// Readable stream
(function(){
class ReadableStream extends stream.Readable {
// Опции, которые можно передать в поток чтения:
// {
// highWaterMark: 16 // максимальное число байт, которое можно хранить во внутреннем буфере до прекращения чтения или число объектов
// , encoding: 'utf8' // null или строка с кодировкой, согласно которой буферы будут декодированы в строки во время чтения
// , objectmode: false // вместо передачи данных в виде буферов или строк можно передавать для читения объекты, в этом случае данная опция должна быть равна true
// }
constructor(content, options) {
super(options); // Опции потока чтения
this.content = content; // Переданное в поток чтения содержимое
}
// Для создания потока чтения обязательно нужно реализовать свой внутренний метод _read()
// Метод _read() внутри себя должен обязательно вызывать метод this.push() для передачи данных во внутренний буфер потока чтения
_read (size) { // size - размер части данных в байтах, может быть не задан, если передаются данные в виде объектов
if (this.content) { // Если осталось еще необработанное содержимое
this.push(this.content.slice(0, size)); // Заталкиваем новую часть данных во внутренний буфер потока чтения
this.content = this.content.slice(size); // Отрезаем часть уже обработанного содержимого
} else {
this.push(null); // Заталкиваем null во внутренний буфер потока чтения для обозначения того, что поток чтения завершен
}
// Если в процессе чтения данных возникнет ошибка, то лучше её выбросить с помощью события
// if (this.content.slice(0, size) !== 'abc') {
// process.nextTick(function () {process.emit('error', 'Read stream error')});
// return;
// }
}
}
const readStream = new ReadableStream("The quick brown fox jumps over the lazy dog."); // Передаем в поток чтения исходные данные для обработки
readStream.setEncoding('utf8'); // устанваливает кодировку для данных в потоке чтения
const chunkSize = 10; // Устанавливаем размер части данных для обработки
let chunk = readStream.read(chunkSize); // Считываем часть данных
while (chunk !== null) { // Если считанные данные не равны null, то поток чтения еще не закончен
console.log(chunk.toString()); // Выводим считанную часть данных в конксоль
chunk = readStream.read(chunkSize); // Считываем следующую часть данных
}
// The quick
// brown fox
// jumps over
// the lazy
// dog.
const otherReadStream = new ReadableStream("How now brown cow?");
otherReadStream.pipe(process.stdout); // Автоматически выводим все считанные данные в консоль
// How now brown cow
// Для чтения данных с помощью потока чтения можно использовать следующие функции
// Для управляемого пошагового чтения данных из потока чтения
// let chunk = readStream.read(chunkSize);
// while (chunk !== null) {
// console.log(chunk.toString());
// chunk = readStream.read(chunkSize);
// }
// Для автоматического чтения данных из потока чтения с помощью событий
// readStream.on('data', function (chunk) {console.log(chunk.toString());});
// readStream.on('end', function () {console.log('Read stream end');});
// Для передачи данных из потока чтения в другой поток
// readStream.pipe(target);
// Для определения момента завершения чтения и записи данных можно указать в pipe опцию {end: false} и задать свой слушатель события on('end')
// readStream.pipe(writeStream, {end: false});
// readStream.on('end', function () {
// writeStream.end('Goodbye');
// });
// Дополнительные события для потока чтения
// readStream.on('close', function () {console.log('Read stream closed');});
// readStream.on('error', function (error) {console.log('Read stream error: ' + error.message);});
})();
// Пример чтения потока данных, состоящих из объектов
(function(){
const data = [
{id: 1, name: 'object 1', value: 1}
, {id: 2, name: 'object 2', value: 2}
, {id: 3, name: 'object 3', value: 3}
]
class ReadStream extends stream.Readable {
constructor (content, options) {
super(options);
this.content = content;
this.curIndex = 0;
}
// Метод _read() определяет какие данных надо положить в очередь для чтения с помощью метода push() и дальнейшей передачи их потребителю данных
// Когда данные для чтения заканчиваются данный метод должен положить в очередь чтения null с помощью метода push(null), который сообщит потребителю данных, что поток чтения завершился
_read () {
if (this.curIndex < this.content.length) {
const chunk = this.content[this.curIndex++];
this.push(chunk);
} else {
this.push(null);
}
}
}
// Для чтения данных в виде объектов вместо строк и буферов обязательно надо прописать опцию {objectMode: true}
const readStream = new ReadStream(data, {objectMode: true});
/*
// При возникновении события 'readable' мы значем, что данные уже находятся во внутреннем буфере и готовы для пошагового чтения с помощью метода read()
readStream.on('readable', function() {
let chunk = readStream.read();
while (chunk !== null) {
console.log('Read received: ' + JSON.stringify(chunk));
chunk = readStream.read();
}
});
*/
// При добавлении события 'data' данные автоматически считываются и передаются потребителю
// В данном режиме можно ставить поток на паузу, а потом его возобновлять
// Как только будет вызван метод pause() поток чтения перестанет принимать данные по событию 'data' до тех пор, пока не будет вызван метод resume()
// Однако поток чтения продолжит в этом время заталкивать данные в очередь буфера чтения
readStream.on('data', function (chunk) {
console.log('Received chunk: ' + JSON.stringify(chunk));
console.log('Pausing read stream for 2 seconds');
readStream.pause();
setTimeout(function () {
console.log('Resuming stream');
readStream.resume();
}, 2000);
});
readStream.on('end', function() {
console.log('Read done');
});
})();
// Writable stream
(function(){
const readStream = new stream.Readable(); // Создаем поток чтения
readStream.push('the quick brown fox '); // Заталкиваем во внутренний буфер потока чтения части данных
readStream.push('jumps over the lazy dog.');
readStream.push(null); // Заталкиваем null во внутренний буфер потока чтения для обозначения того, что поток чтения завершен
class WritableStream extends stream.Writable {
// Опции, которые можно передать в поток записи:
// {
// highWaterMark: 16 // максимальное число байт, которое можно хранить во внутреннем буфере до прекращения чтения или число объектов
// , decodeStrings: true // cледует ли декодировать строки в буфер, прежде чем передавать их во внутренний метод _write()
// }
constructor (options) {
super(options); // Опции потока записи данных
}
// Для создания потока записи обязательно нужно реализовать свой внутренний метод _write
// Метод _write определяет куда надо направить данные
// chunk - часть данных, которую надо записать во время потока записи
// encoding - кодировка части данных
// next - функция, которую нужно будет вызывать каждый раз для перехода к следующей порции данных для записи, она сообщает, что текущая часть данных записана и можно брать следующую
_write (chunk, encoding, next) {
console.log(chunk.toString()); // Выводим переданные части данных в консоль
console.log('Waiting 2 seconds');
setTimeout(function() {
console.log('Finished waiting');
next(); // Вызов функции next() сообщает, что можно переходить к записи следующей части данных
// Поток записи не будет получать новые данные, пока функци next() не будет вызвана
// Функцию next можно вызывать с ошибкой, если в данных обнаружена ошикба: next(new Error('chunk is invalid'));
// Пример:
// if (chunk.toString().indexOf('a') >= 0) {
// next(new Error('chunk is invalid'));
// } else {
// next();
// }
}, 2000);
}
}
const writeStream = new WritableStream(); // Создаем поток записи
readStream.pipe(writeStream); // Автоматически передаем данные из потока чтения в поток записи
// Для записи данных с помощью потока записи можно использовать следующие функции
// Для управляемого пошаговой записи данных из потока чтения
// let chunk = readStream.read(chunkSize);
// while (chunk !== null) {
// writableStream.write(chunk, 'utf8');
// chunk = readStream.read(chunkSize);
// }
// writableStream.end();
// Для автоматической записи данных из потока чтения с помощью событий
// readStream.on('data', function (chunk) {writableStream.write(chunk);});
// readStream.on('end', function () {writableStream.end();});
// Для передачи данных из потока чтения в поток записи
// readStream.pipe(writableStream);
// Дополнительные события для потока записи
// writableStream.on('close', function () {console.log('Write stream closed');});
// Во время работы потока записи данные поступают в его внутренний буфер
// Данный буфер в какой-то момент может быть заполнен данными до предела
// На этот случай поток поступаления данных для записи приостанавливают
// Возобновить дальнейшее получение данных можно будет по событию 'darain' как только буфер потока записи очистится
// writableStream.once('drain', function () {readableStream.resume();});
// writableStream.on('error', function (error) {console.log('Write stream error: ' + error.message);});
// Это событие для потока записи аналогично событию on('end') для потока чтения
// writableStream.on('finish', function () {console.log('Write stream end');});
})();
// Transform stream
(function(){
const readStream = new stream.Readable(); // Создаем поток чтения
readStream.push('the quick brown fox '); // Заталкиваем во внутренний буфер потока чтения части данных
readStream.push('jumps over the lazy dog.');
readStream.push(null); // Заталкиваем null во внутренний буфер потока чтения для обозначения того, что поток чтения завершен
class TransformStream extends stream.Transform {
constructor (options) {
super(options); // Опции потока трансформации данных
}
// Для создания потока трансформации обязательно нужно реализовать свой внутренний метод _transform
// Метод _transform() внутри себя должен обязательно вызывать метод this.push() для передачи данных потребителю данных
// next - функция, которую нужно будет вызывать каждый раз для перехода к следующей порции данных для трансформации
_transform (chunk, encoding, next) {
const data = chunk.toString().toUpperCase(); // Трансформируем переданные в поток трансформации части данных
this.push(data); // Заталкиваем трансформированную часть данных во внутренний буфер потока трансформации
next();
}
// Опционально можно реализовать внутренний метод _flush()
// Данный метод будет вызван в конце потока, когда вся передача данных завершится
// Его можно использовать для очистки буферов после выполнения все работы
_flush (next) {
next();
}
}
const transformStream = new TransformStream(); // Создаем поток трансформации
readStream.pipe(transformStream).pipe(process.stdout); // Автоматически передаем данные из потока чтения в поток трансформации и выводим их в консоль
// THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
// Для трансформации данных с помощью потока трансформации можно использовать следующие функции
// Для автоматической трансформации данных из потока чтения с помощью событий
// readStream.on('data', function (chunk) {...);
// readStream.on('end', function () {...});
// Для передачи данных из потока чтения в поток трансформации и далее в поток записи
// readStream.pipe(transformStream).pipe(writableStream);
})();
// Пример трансформации потока данных, состоящих из объектов
(function(){
const data = [
{id: 1, name: 'object 1', value: 1}
, {id: 2, name: 'object 2', value: 2}
, {id: 3, name: 'object 3', value: 3}
]
class ReadStream extends stream.Readable {
constructor (content, options) {
super(options);
this.content = content;
this.curIndex = 0;
}
_read () {
if (this.curIndex < this.content.length) {
const chunk = this.content[this.curIndex++];
this.push(chunk);
} else {
this.push(null);
}
}
}
class WritableStream extends stream.Writable {
constructor (options) {
super(options);
}
_write (chunk, encoding, next) {
console.log('After transform: ' + JSON.stringify(chunk));
next();
}
}
class TransformStream extends stream.Transform {
constructor (options) {
super(options);
}
_transform (chunk, encoding, next) {
console.log('Transform before: ' + JSON.stringify(chunk));
if (typeof chunk.originalValue === 'undefined') {
chunk.originalValue = chunk.value;
chunk.value++;
}
console.log('Transform after: ' + JSON.stringify(chunk));
if (chunk.originalValue !== 1) {
this.push(chunk);
}
next();
}
}
const readStream = new ReadStream(data, {objectMode: true});
const writeStream = new WritableStream({objectMode: true});
// Для трансформации данных в виде объектов вместо строк и буферов обязательно надо прописать опцию {objectMode: true}
const transformStream = new TransformStream({objectMode: true});
readStream.pipe(transformStream).pipe(writeStream);
})();
// Duplex stream
(function(){
class DuplexStream extends stream.Duplex {
constructor (source, options) {
super(options);
this.source = source;
}
_read (size) {
this.source.fetchSomeData(size, function (data, encoding) {
this.push(Buffer.from(data, encoding));
});
}
_write (chunk, encoding, next) {
// Данный пример с данными обрабытывает только строки
if (Buffer.isBuffer(chunk)) {
chunk = chunk.toString();
}
this.source.writeSomeData(chunk);
next();
}
}
})();
// Пример дуплексного потока данных, состоящих из объектов
(function(){
// Все Transform streams являются Duplex Streams
const myTransform = new stream.Transform(
{
writableObjectMode: true
, transform: function (chunk, encoding, next) {
// При необходимости привести часть данных к числу
chunk |= 0;
// Трансформировать часть данных во что-то ещё
const data = chunk.toString(16);
// Затолкнуть данные в очередь для чтения
next(null, '0'.repeat(data.length % 2) + data);
}
}
);
myTransform.setEncoding('ascii');
myTransform.on('data', function (chunk) {console.log(chunk);});
myTransform.write(1); // Prints: 01
myTransform.write(10); // Prints: 0a
myTransform.write(100); // Prints: 64
})();
// Пример генерации данных в потоке чтения и записи их в потоке записи
(function(){
const readableSream = stream.Readable();
const writableStream = stream.Writable();
let char = 97;
readableSream._read = function () { // Данный метод вызывается в цикле чтения потока данных
readableSream.push(
String.fromCharCode(char++) // Помещение в буфер чтения данных
);
if (char > 'z'.charCodeAt(0)) {
readableSream.push(null); // Помещение в буфер чтения метки null, сообщающей о завершении потока чтения
}
};
writableStream._write = function (chunk, encoding, next) { // Данный метод вызывается в цикле записи потока данных
console.log(chunk.toString('utf8')); // Вывод данных для записи в консоль
next();
};
readableSream.on('data', function (chunk) {
writableStream.write(chunk);
});
readableSream.on('end', function () {
writableStream.end();
});
writableStream.on('finish', function () {
console.log('Writable stream end');
});
// Помещение данных в буфер потока чтения
readableSream.push('1');
readableSream.push('2');
readableSream.push('3');
// Передача данных в поток чтения, минуя бувер чтения с помощью событий
readableSream.emit('data', '4');
readableSream.emit('data', '5');
readableSream.emit('data', '6');
// Помещение в буфер потока чтения пометки null, говорящей потоку чтения о том, что передача данных завершена
readableSream.push(null); // Readable stream end
console.log('Readable stream end');
// readableSream.pipe(writableStream); // Всё выше перечисленное можно записать в одну эту строку.
})();
// Потоковое чтение и потоковая запись данных из файлов
(function(){
const fs = require('fs');
const readStream = fs.createReadStream('./file1.txt');
const writeStream = fs.createWriteStream('./file2.txt');
readStream.setEncoding('utf8');
// Первый вариант чтения данных с помощью события 'data'
// Отличается от события 'readable' тем, что переводит поток в автоматический режим чтения данных
readStream.on('data', function (chunk) {
console.log('Read stream data: ' + chunk.toString());
const ready = writeStream.write(chunk, 'utf8'); // Если внутренниц буфер потока записи уже заполнен до конца, то write() вернет false и процесс поступления новых данных на время остановится
if (ready === false) {
this.pause();
writeStream.once('drain', this.resume.bind(this)); // событие 'drain' сообщает, что запись части данных завершена и можно переходить к чтению и записи следующей части данных
}
});
// Альтернативный вариант чтения данных с помощью события 'readable'
// Отличается от события 'data' тем, что переводит поток в ручной режим чтения данных
readStream.on('readable', function () {
let chunk = readStream.read();
while (chunk !== null) {
console.log('Read stream data: ' + chunk.toString());
writeStream.write(chunk, 'utf8');
chunk = readStream.read();
}
});
readStream.on('end', function () {
console.log('Read stream end');
writeStream.end();
});
writeStream.on('finish', function () {
console.log('Write stream end');
});
readStream.on('error', function (error) {
console.log('Read stream error: ' + error.message);
});
writeStream.on('error', function (error) {
console.log('Write stream error: ' + error.message);
});
// readStream.pipe(writeStream); // Всё выше перечисленное можно записать в одну эту строку.
})();
Комментариев нет:
Отправить комментарий