Streamについて (Node.js)

そんなものか〜程度に理解しただけで、エラー云々全然考えていません(^^;;

Streamを使うことで小分けされたデータ(chunk)順番に処理する(まとめて一括処理ではない)ことで、メモリの消費を少なくします(全てのデータをメモリに保持しない)

Streamの種類

  • Readable Stream :読み込みができるストリーム
    *例:fs.ReadStream・http requests
  • Writable Stream :書き込みができるストリーム
    *例:fs.WriteStream ・http responses
  • Duplex Stream :読み書き両方できるストリーム
    *例:net.Socket
  • Transform Stream :読み書きのタイミングでデータ加工ができるストリーム
    *例:zlib.createDeflate()

Streamが使用されている例

  • process
    stdin・stdout・stderr
  • fs
    createReadStream・createWriteStream
  • net
    socket
  • zlib
    createGzip・createGunzip・createInflate・createDeflate
  • crypto
    createCypheriv
目次
  1. Streamのイベントとよく使うメソッド
  2. Streamを使ったファイルの読み書き
  3. Buffer
  4. streamモジュール

Streamのイベントとよく使うメソッド

イベントReadableWritableDuplexTransform
close
error
data
end
pause
resume
readable
drain
finish
pipe
unpipe

Streamの主なメソッド

  • Writable
    writable.end([chunk[, encoding]][, callback])
    writable.write(chunk[, encoding][, callback])
  • Readable
    readable.pipe(destination[, options])
    readable.read([size])
    readable.resume()
    readable.pause()

Streamを使ったファイルの読み書き

fsモジュールにファイルをStreamで扱う仕組みがあります

//戻り値:fs.ReadStream
fs.createReadStream(path[, options])

//戻り値:fs.WriteStream
fs.createWriteStream(path[, options])

sample.txtをStreamを利用して読み込む
読み込んだデータはdataイベント(1つデータを読み込んだときに発生するイベント)で取得します
データが最後に到達するとendイベント(全ての読み込みが完了すると発生するイベント)が発行されます
errorイベントはエラー発生時のイベントです

const fs = require("fs");
const fsReader = fs.createReadStream(__dirname + "/sample.txt", "utf8");
let data = "";
fsReader.on("data", (chunk) => {
  data += chunk;
});
fsReader.on("end", ()=>{
  console.log(data);
});
fsReader.on("error", (err)=>{
  console.error(err);
});

読み込んだsample.txtをcopy.txtを作成書き込みます
pipe(stream) :次の処理(stream)に繋ぎます

const fs = require("fs");

const fsReader = fs.createReadStream(__dirname + "/sample.txt", "utf8");
const fsWriter = fs.createWriteStream(__dirname + "/copy.txt", "utf8");
fsReader.on("data", (chunk) => {
  fsWriter.write(chunk)
});

//または
const fsReader = fs.createReadStream(__dirname + "/sample.txt", "utf8");
const fsWriter = fs.createWriteStream(__dirname + "/copy.txt", "utf8");
fsReader.pipe(fsWriter);

Buffer

Bufferはある場所から別の場所に転送されるデータのチャンク(小分けされたデータ)のための一時的な保管場所です
Bufferがデータで満たされると次に渡されます

*コンピュータに送信されたすべてのデータは結果を処理して出力する前に、バイナリ(0と1)に変換されます
次にエンコードして型を区別します(画像やビデオなどはバイナリデータです)

下記コードのchunkはBufferです

const fs = require("fs");
const fsReader = fs.createReadStream(__dirname + "/sample.txt");
fsReader.on("data", (chunk) => {
  console.log('chunk\n', chunk)
});

//chunk
//<Buffer 0a 3c 73 76 67 20 78 6d 6c ...省略  more bytes>
//chunk
//<Buffer 3d 22 77 70 2d 62 6c 6f 63 ...省略  more bytes>

文字列→Buffer
文字列をBufferに書き込みます

buffer.write(変換したい文字列 [, encoding]) 

Buffer→文字列

 buffer.toString([encoding]); 
//process.stdinはReadable Stream
process.stdin.on('data', (chunk)=>{
   console.log(chunk)
    console.log(chunk.toString())
})
//hello(キーボード入力)
//<Buffer 68 65 6c 6c 6f 0a>
//hello

容量の大きいバイナリファイルを作成してみる
*1e9 は1×10の9乗 (10億)
*-e(コマンドの引数にワンライナープログラムを指定:ファイルを作成せずに実行できる)
*process.stdoutはWritable Stream

$ node -e "process.stdout.write(crypto.randomBytes(1e9))" > binary.file

Node.jsで用意されているエンコーディング
デフォルトは”utf8″

  • ascii: ASCII文字列
  • utf8 :UTF-8文字列
  • utf16le :リトルエンディアンUTF-16(UTF-16LE)文字列
  • ucs2 :utf16lfと同じ
  • base64: BASE64でエンコードされた文字列
  • binary :バイナリデータ(利用は推奨されない)
  • hex :16進数で表記された文字列

mp4ファイル(ビデオ)の読み書き

const fs = require("fs");
const fsReader = fs.createReadStream(__dirname + "/sample.mp4")
const fsWriter = fs.createWriteStream(__dirname + "/copy.mp4")
fsReader.pipe(fsWriter).on('error', console.error);

http://localhost:3000にアクセスするとビデオが再々されます
*なんちゃってストリーミング配信^^;

const fs = require('fs');
const http =  require('http');

http.createServer((req, res) => {
    res.writeHeader(200, { 'Content-Type': 'video/mp4' });
    fs.createReadStream( './sample.mp4')
        .pipe(res)
        .on('error', console.error);
}).listen(3000);

streamモジュール

Readableクラスを継承して配列を1つづつ読みこむカスタムストリームを作成
*クラス継承のメソッドには「_」が付きます

const { Readable } = require('stream');
//出力したい配列
const arrayList = [ "aa", "bb", "cc"];
//Readableクラスを継承
//配列の要素を1つづつ読みこむカスタムストリーム
class arrayReader extends Readable {
  constructor(array){
    super({encoding:'utf8'});
    this.array = array;
    this.index = 0
  }
  _read() {
    if(this.index < this.array.length){
      const chunk = `${this.array[this.index]}\n`
      this.push(chunk);
      this.index += 1;
    }
  }
}

const arrayStream = new arrayReader(arrayList);
// arrayStream.on('data', (chunk) => console.log(chunk));
// aa
// bb
// cc

//書き込む
const fs = require("fs");
const fsWriter = fs.createWriteStream(__dirname + "/copy.txt")
arrayStream.pipe(fsWriter)

オブジェクトモードについて
Node.jsのStreamは「文字列もしくはBufferオブジェクト」を操作するためのものです
それ以外の型で実装する場合はobjectModeオプションを使用してオブジェクトモードに切り替えます

const {Readable} = require('stream');
const arrayList = [ "aa", "bb", "cc"];

class arrayReader extends Readable {
  constructor(array){
//オブジェクトモード
    super({ objectMode: true});
    this.array = array;
    this.index = 0
  }
  _read() {
    if(this.index < this.array.length){
//オブジェクトにする
      const chunk = {
        data: this.array[this.index],
        index: this.index
      }
      this.push(chunk);
      this.index += 1;
    }
  }
}
const arrayStream = new arrayReader(arrayList);
arrayStream.on('data', (chunk) => console.log(chunk));
//{ data: 'aa', index: 0 }
//{ data: 'bb', index: 1 }
//{ data: 'cc', index: 2 }

入力した文字を大文字にして出力する(データ加工)

const {Transform} = require('stream');

class toUpperCaseText extends Transform {
  constructor(){
    super();
  }
  _transform(chunk, encoding, callback){
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}
const toUpperCaseStream = new toUpperCaseText();
process.stdin.pipe(toUpperCaseStream).pipe(process.stdout);
//aaa
//AAA