среда, 5 июля 2017 г.

Node.js Шпаргалка по API Stream

// Потоки предназначены для обработки данных в виде строк или двоичных данных в виде буферов, но также можно обрабатывать и данные в виде объектов
// Работа streams основана на работе EventEmitter.
// При создании события on('data') поток чтения работает в автоматическом режиме
// При создании события on('readable') поток чтения работает в пошаговом режиме и требует ручного вызова метода read() для чтения данных
// Поток чтения в режиме 'readable' изначально поставлен на паузу и работает в пошаговом режиме, но его можно переключить в автоматический режим
// Для этого необъодимо добавить обработчик события on('data') и снять поток чтения с режима паузы с помощью метода resume() или просто использовать метод pipe()
// Для обратного переключения из автоматического режима в пошаговый необходимо вызвать метод pause(), удалить обработчик события on('data') или просто использовать метод unpipe()
// Для дуплексных потоков необходимо определить и метод _read и метод _write
// События клиента и сервера это тоже streams.
// Используются 4 основных события:
// on('data') или on('readable') - когда идет поток чтения данных - 'data' - считывает данные в автоматическом режиме, 'readable' считывает данные в ручном пошаговом режиме
// on('end') -  когда поток чтения данных завершен
// on('error') - если во время чтения или записи данных происходит ошибка
// on('finish') - когда завершен поток записи данных, вызывается после вызова метода writeStream.end()
// И дополнительное событие
// on('drain') - когда данные записаны из буфера чтения
// on('close') - кода поток завершен
// Методы:
// readStream.push('data') - для помещения данных в буфер потока чтения
// readStream.unshift() - заталкивает (возвращает) часть данных обратно во внутренний буфер, если они не были приняты, например, потоком чтения, и потому могут быть переданы в другой поток
// readStream.read(10) - для считывания данных из потока чтения только в пошаговом режиме, когда поток чтения поставлен на паузу
// readStream.read(0) - всегда возвращает null и используется только для обновления (refresh) потока чтения данных
// writeStream.write('data') - для записи данных в потоке записи,возвращает true, если внутренний буфер еще не переполнен и false, если внутренний буфер уже переполненн данными для записи
// writeStream.end() - для завершения записи данных в потоке записи
// readStream.pause() - для остановки чтения данных из потока чтения и перехода в пошаговый режим чтения данных, если мы находимся в режиме чтения с помощью событий
// readStream.resume() - для запуска потока чтения с помощью событий, если мы находимся в пошаговом ражиме чтения данных
// readStream.isPaused() - проверяет поставлен ли поток чтения на паузу, используется в основе метода pipe() и в других случаях обычно не используется
// readStream.pipe(writeStream) - осуществляет передачу данных из потока в поток без прерывания, возвращает последний переданный ей поток, что позволяет создавать цепочки потоков
// readStream.unpipe(writeStream) - останавливает передачу данных из потка чтения в поток записи
// writeStream.cork() - записывает все данные для записи из внутреннего буфера в память
// writeStream.uncork() - вынимает все данные для записи из внутреннего буфера памяти
// writeStream.setDefaultEncoding() - устанаdливает кодировку по умолчанию для трансформации частей с буферами в строки
// readStream.setEncoding() - устанавливает кодировку для данных потока чтения
// readStream.destroy() - уничтожает поток чтения и вызывает событие с передачей ему сообщения об ошибке. При использовании необходимо внутри класса создать метод _destroy()
// writeStream.destroy() - уничтожает поток записи и вызывает событие с передачей ему сообщения об ошибке. При использовании необходимо внутри класса создать метод _destroy()
// transformStream.destroy() - уничтожает поток трансформации и вызывает событие с передачей ему сообщения об ошибке. При использовании необходимо внутри класса создать метод _destroy()
// Для определения момента завершения чтения и записи данных можно указать в pipe опцию {end: false} и задать свой слушатель события on('end')
// readStream.pipe(writeStream, {end: false});
// readStream.on('end', function () {
//     writeStream.end('Goodbye');
// });

const stream = require('stream');

// Readable stream

(function(){

class ReadableStream extends stream.Readable {
    // Опции, которые можно передать в поток чтения:
    // {
    //        highWaterMark: 16 // максимальное число байт, которое можно хранить во внутреннем буфере до прекращения чтения или число объектов
    //      , encoding: 'utf8' // null или строка с кодировкой, согласно которой буферы будут декодированы в строки во время чтения
    //      , objectmode: false // вместо передачи данных в виде буферов или строк можно передавать для читения объекты, в этом случае данная опция должна быть равна true
    // }
    constructor(content, options) {
        super(options); // Опции потока чтения
        this.content = content; // Переданное в поток чтения содержимое
    }
    // Для создания потока чтения обязательно нужно реализовать свой внутренний метод _read()
    // Метод _read() внутри себя должен обязательно вызывать метод this.push() для передачи данных во внутренний буфер потока чтения
    _read (size) { // size - размер части данных в байтах, может быть не задан, если передаются данные в виде объектов
        if (this.content) { // Если осталось еще необработанное содержимое
            this.push(this.content.slice(0, size)); // Заталкиваем новую часть данных во внутренний буфер потока чтения
            this.content = this.content.slice(size); // Отрезаем часть уже обработанного содержимого
        } else {
            this.push(null); // Заталкиваем null во внутренний буфер потока чтения для обозначения того, что поток чтения завершен
        }
        // Если в процессе чтения данных возникнет ошибка, то лучше её выбросить с помощью события
        // if (this.content.slice(0, size) !== 'abc') {
        //     process.nextTick(function () {process.emit('error', 'Read stream error')});
        //     return;
        // }
    }
}

const readStream = new ReadableStream("The quick brown fox jumps over the lazy dog."); // Передаем в поток чтения исходные данные для обработки
readStream.setEncoding('utf8'); // устанваливает кодировку для данных в потоке чтения
const chunkSize = 10; // Устанавливаем размер части данных для обработки
let chunk = readStream.read(chunkSize); // Считываем часть данных
while (chunk !== null) { // Если считанные данные не равны null, то поток чтения еще не закончен
    console.log(chunk.toString()); // Выводим считанную часть данных в конксоль
    chunk = readStream.read(chunkSize); // Считываем следующую часть данных
}

// The quick
// brown fox
// jumps over
//  the lazy
// dog.

const otherReadStream = new ReadableStream("How now brown cow?");
otherReadStream.pipe(process.stdout); // Автоматически выводим все считанные данные в консоль

// How now brown cow

// Для чтения данных с помощью потока чтения можно использовать следующие функции

// Для управляемого пошагового чтения данных из потока чтения
// let chunk = readStream.read(chunkSize);
// while (chunk !== null) {
//     console.log(chunk.toString());
//     chunk = readStream.read(chunkSize);
// }

// Для автоматического чтения данных из потока чтения с помощью событий
// readStream.on('data', function (chunk) {console.log(chunk.toString());});
// readStream.on('end', function () {console.log('Read stream end');});

// Для передачи данных из потока чтения в другой поток
// readStream.pipe(target);

// Для определения момента завершения чтения и записи данных можно указать в pipe опцию {end: false} и задать свой слушатель события on('end')
// readStream.pipe(writeStream, {end: false});
// readStream.on('end', function () {
//     writeStream.end('Goodbye');
// });

// Дополнительные события для потока чтения

// readStream.on('close', function () {console.log('Read stream closed');});

// readStream.on('error', function (error) {console.log('Read stream error: ' + error.message);});

})();

// Пример чтения потока данных, состоящих из объектов

(function(){

const data = [
      {id: 1, name: 'object 1', value: 1}
    , {id: 2, name: 'object 2', value: 2}
    , {id: 3, name: 'object 3', value: 3}
]

class ReadStream extends stream.Readable {
    constructor (content, options) {
        super(options);
        this.content = content;
        this.curIndex = 0;
    }
    // Метод _read() определяет какие данных надо положить в очередь для чтения с помощью метода push() и дальнейшей передачи их потребителю данных
    // Когда данные для чтения заканчиваются данный метод должен положить в очередь чтения null с помощью метода push(null), который сообщит потребителю данных, что поток чтения завершился
    _read () {
        if (this.curIndex < this.content.length) {
            const chunk = this.content[this.curIndex++];
            this.push(chunk);
        } else {
            this.push(null);
        }
    }
}

// Для чтения данных в виде объектов вместо строк и буферов обязательно надо прописать опцию {objectMode: true}
const readStream = new ReadStream(data, {objectMode: true});
/*
// При возникновении события 'readable' мы значем, что данные уже находятся во внутреннем буфере и готовы для пошагового чтения с помощью метода read()
readStream.on('readable', function() {
    let chunk = readStream.read();
    while (chunk !== null) {
        console.log('Read received: ' + JSON.stringify(chunk));
        chunk = readStream.read();
    }
});
*/
// При добавлении события 'data' данные автоматически считываются и передаются потребителю
// В данном режиме можно ставить поток на паузу, а потом его возобновлять
// Как только будет вызван метод pause() поток чтения перестанет принимать данные по событию 'data' до тех пор, пока не будет вызван метод resume()
// Однако поток чтения продолжит в этом время заталкивать данные в очередь буфера чтения
readStream.on('data', function (chunk) {
    console.log('Received chunk: ' + JSON.stringify(chunk));
    console.log('Pausing read stream for 2 seconds');
    readStream.pause();
    setTimeout(function () {
        console.log('Resuming stream');
        readStream.resume();
    }, 2000);
});

readStream.on('end', function() {
    console.log('Read done');
});

})();

// Writable stream

(function(){

const readStream = new stream.Readable(); // Создаем поток чтения
readStream.push('the quick brown fox '); // Заталкиваем во внутренний буфер потока чтения части данных
readStream.push('jumps over the lazy dog.');
readStream.push(null); // Заталкиваем null во внутренний буфер потока чтения для обозначения того, что поток чтения завершен

class WritableStream extends stream.Writable {
    // Опции, которые можно передать в поток записи:
    // {
    //        highWaterMark: 16 // максимальное число байт, которое можно хранить во внутреннем буфере до прекращения чтения или число объектов
    //      , decodeStrings: true // cледует ли декодировать строки в буфер, прежде чем передавать их во внутренний метод _write()
    // }
    constructor (options) {
        super(options);  // Опции потока записи данных
    }
    // Для создания потока записи обязательно нужно реализовать свой внутренний метод _write
    // Метод _write определяет куда надо направить данные
    // chunk - часть данных, которую надо записать во время потока записи
    // encoding - кодировка части данных
    // next - функция, которую нужно будет вызывать каждый раз для перехода к следующей порции данных для записи, она сообщает, что текущая часть данных записана и можно брать следующую
    _write (chunk, encoding, next) {
        console.log(chunk.toString()); // Выводим переданные части данных в консоль
        console.log('Waiting 2 seconds');
        setTimeout(function() {
            console.log('Finished waiting');
            next(); // Вызов функции next() сообщает, что можно переходить к записи следующей части данных
            // Поток записи не будет получать новые данные, пока функци next() не будет вызвана
            // Функцию next можно вызывать с ошибкой, если в данных обнаружена ошикба: next(new Error('chunk is invalid'));
            // Пример:
            // if (chunk.toString().indexOf('a') >= 0) {
            //     next(new Error('chunk is invalid'));
            // } else {
            //     next();
            // }
        }, 2000);
    }
}

const writeStream = new WritableStream(); // Создаем поток записи

readStream.pipe(writeStream); // Автоматически передаем данные из потока чтения в поток записи

// Для записи данных с помощью потока записи можно использовать следующие функции

// Для управляемого пошаговой записи данных из потока чтения
// let chunk = readStream.read(chunkSize);
// while (chunk !== null) {
//     writableStream.write(chunk, 'utf8');
//     chunk = readStream.read(chunkSize);
// }
// writableStream.end();

// Для автоматической записи данных из потока чтения с помощью событий
// readStream.on('data', function (chunk) {writableStream.write(chunk);});
// readStream.on('end', function () {writableStream.end();});

// Для передачи данных из потока чтения в поток записи
// readStream.pipe(writableStream);

// Дополнительные события для потока записи

// writableStream.on('close', function () {console.log('Write stream closed');});

// Во время работы потока записи данные поступают в его внутренний буфер
// Данный буфер в какой-то момент может быть заполнен данными до предела
// На этот случай поток поступаления данных для записи приостанавливают
// Возобновить дальнейшее получение данных можно будет по событию 'darain' как только буфер потока записи очистится
// writableStream.once('drain', function () {readableStream.resume();});

// writableStream.on('error', function (error) {console.log('Write stream error: ' + error.message);});

 // Это событие для потока записи аналогично событию on('end') для потока чтения
// writableStream.on('finish', function () {console.log('Write stream end');});

})();

// Transform stream

(function(){

const readStream = new stream.Readable(); // Создаем поток чтения
readStream.push('the quick brown fox '); // Заталкиваем во внутренний буфер потока чтения части данных
readStream.push('jumps over the lazy dog.');
readStream.push(null); // Заталкиваем null во внутренний буфер потока чтения для обозначения того, что поток чтения завершен

class TransformStream extends stream.Transform {
    constructor (options) {
        super(options); // Опции потока трансформации данных
    }
    // Для создания потока трансформации обязательно нужно реализовать свой внутренний метод _transform
    // Метод _transform() внутри себя должен обязательно вызывать метод this.push() для передачи данных потребителю данных
    // next - функция, которую нужно будет вызывать каждый раз для перехода к следующей порции данных для трансформации
    _transform (chunk, encoding, next) {
        const data = chunk.toString().toUpperCase(); // Трансформируем переданные в поток трансформации части данных
        this.push(data); // Заталкиваем трансформированную часть данных во внутренний буфер потока трансформации
        next();
    }
    // Опционально можно реализовать внутренний метод  _flush()
    // Данный метод будет вызван в конце потока, когда вся передача данных завершится
    // Его можно использовать для очистки буферов после выполнения все работы
    _flush (next) {
        next();
    }
}

const transformStream = new TransformStream(); // Создаем поток трансформации

readStream.pipe(transformStream).pipe(process.stdout); // Автоматически передаем данные из потока чтения в поток трансформации и выводим их в консоль

// THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.

// Для трансформации данных с помощью потока трансформации можно использовать следующие функции

// Для автоматической трансформации данных из потока чтения с помощью событий
// readStream.on('data', function (chunk) {...);
// readStream.on('end', function () {...});

// Для передачи данных из потока чтения в поток трансформации и далее в поток записи
// readStream.pipe(transformStream).pipe(writableStream);

})();

// Пример трансформации потока данных, состоящих из объектов

(function(){

const data = [
      {id: 1, name: 'object 1', value: 1}
    , {id: 2, name: 'object 2', value: 2}
    , {id: 3, name: 'object 3', value: 3}
]

class ReadStream extends stream.Readable {
    constructor (content, options) {
        super(options);
        this.content = content;
        this.curIndex = 0;
    }
    _read () {
        if (this.curIndex < this.content.length) {
            const chunk = this.content[this.curIndex++];
            this.push(chunk);
        } else {
            this.push(null);
        }
    }
}

class WritableStream extends stream.Writable {
    constructor (options) {
        super(options);
    }
    _write (chunk, encoding, next) {
        console.log('After transform: ' + JSON.stringify(chunk));
        next();
    }
}

class TransformStream extends stream.Transform {
    constructor (options) {
        super(options);
    }
    _transform (chunk, encoding, next) {
        console.log('Transform before: ' + JSON.stringify(chunk));
        if (typeof chunk.originalValue === 'undefined') {
            chunk.originalValue = chunk.value;
            chunk.value++;
        }
        console.log('Transform after: ' + JSON.stringify(chunk));
        if (chunk.originalValue !== 1) {
            this.push(chunk);
        }
        next();
    }
}

const readStream = new ReadStream(data, {objectMode: true});

const writeStream = new WritableStream({objectMode: true});

// Для трансформации данных в виде объектов вместо строк и буферов обязательно надо прописать опцию {objectMode: true}
const transformStream = new TransformStream({objectMode: true});

readStream.pipe(transformStream).pipe(writeStream);

})();

// Duplex stream

(function(){

class DuplexStream extends stream.Duplex {
    constructor (source, options) {
        super(options);
        this.source = source;
    }
    _read (size) {
        this.source.fetchSomeData(size, function (data, encoding) {
            this.push(Buffer.from(data, encoding));
        });
    }
    _write (chunk, encoding, next) {
        // Данный пример с данными обрабытывает только строки
        if (Buffer.isBuffer(chunk)) {
            chunk = chunk.toString();
        }
        this.source.writeSomeData(chunk);
        next();
    }
}

})();

// Пример дуплексного потока данных, состоящих из объектов

(function(){

// Все Transform streams являются Duplex Streams
const myTransform = new stream.Transform(
    {
          writableObjectMode: true
        , transform: function (chunk, encoding, next) {
            // При необходимости привести часть данных к числу
            chunk |= 0;
            // Трансформировать часть данных во что-то ещё
            const data = chunk.toString(16);
            // Затолкнуть данные в очередь для чтения
            next(null, '0'.repeat(data.length % 2) + data);
          }
    }
);

myTransform.setEncoding('ascii');

myTransform.on('data', function (chunk) {console.log(chunk);});

myTransform.write(1); // Prints: 01
myTransform.write(10); // Prints: 0a
myTransform.write(100); // Prints: 64

})();

// Пример генерации данных в потоке чтения и записи их в потоке записи

(function(){

const readableSream = stream.Readable();
const writableStream = stream.Writable();

let char = 97;

readableSream._read = function () { // Данный метод вызывается в цикле чтения потока данных
    readableSream.push(
        String.fromCharCode(char++) // Помещение в буфер чтения данных
    );
    if (char > 'z'.charCodeAt(0)) {
        readableSream.push(null); // Помещение в буфер чтения метки null, сообщающей о завершении потока чтения
    }
};

writableStream._write = function (chunk, encoding, next) { // Данный метод вызывается в цикле записи потока данных
    console.log(chunk.toString('utf8')); // Вывод данных для записи в консоль
    next();
};

readableSream.on('data', function (chunk) {
    writableStream.write(chunk);
});

readableSream.on('end', function () {
    writableStream.end();
});

writableStream.on('finish', function () {
    console.log('Writable stream end');
});

// Помещение данных в буфер потока чтения
readableSream.push('1');
readableSream.push('2');
readableSream.push('3');
// Передача данных в поток чтения, минуя бувер чтения с помощью событий
readableSream.emit('data', '4');
readableSream.emit('data', '5');
readableSream.emit('data', '6');
// Помещение в буфер потока чтения пометки null, говорящей потоку чтения о том, что передача данных завершена
readableSream.push(null); // Readable stream end
console.log('Readable stream end');

// readableSream.pipe(writableStream); // Всё выше перечисленное можно записать в одну эту строку.

})();

// Потоковое чтение и потоковая запись данных из файлов

(function(){

const fs = require('fs');

const readStream = fs.createReadStream('./file1.txt');
const writeStream = fs.createWriteStream('./file2.txt');

readStream.setEncoding('utf8');

// Первый вариант чтения данных с помощью события 'data'
// Отличается от события 'readable' тем, что переводит поток в автоматический режим чтения данных
readStream.on('data', function (chunk) {
    console.log('Read stream data: ' + chunk.toString());
    const ready = writeStream.write(chunk, 'utf8'); // Если внутренниц буфер потока записи уже заполнен до конца, то write() вернет false и процесс поступления новых данных на время остановится
    if (ready === false) {
        this.pause();
        writeStream.once('drain', this.resume.bind(this)); // событие 'drain' сообщает, что запись части данных завершена и можно переходить к чтению и записи следующей части данных
    }
});

// Альтернативный вариант чтения данных с помощью события 'readable'
// Отличается от события 'data' тем, что переводит поток в ручной режим чтения данных
readStream.on('readable', function () {
    let chunk = readStream.read();
    while (chunk !== null) {
        console.log('Read stream data: ' + chunk.toString());
        writeStream.write(chunk, 'utf8');
        chunk = readStream.read();
    }
});

readStream.on('end', function () {
    console.log('Read stream end');
    writeStream.end();
});

writeStream.on('finish', function () {
    console.log('Write stream end');
});

readStream.on('error', function (error) {
    console.log('Read stream error: ' + error.message);
});

writeStream.on('error', function (error) {
    console.log('Write stream error: ' + error.message);
});

// readStream.pipe(writeStream); // Всё выше перечисленное можно записать в одну эту строку.

})();

Комментариев нет:

Отправить комментарий