そんなものか〜程度に理解しただけで、エラー云々全然考えていません(^^;;
Streamを使うことで小分けされたデータ(chunk)を順番に処理する(まとめて一括処理ではない)ことで、メモリの消費を少なくします(全てのデータをメモリに保持しない)
Streamの種類
- Readable Stream :読み込みができるストリーム
*例:fs.ReadStream・http requests - Writable Stream :書き込みができるストリーム
*例:fs.WriteStream ・http responses - Duplex Stream :読み書き両方できるストリーム
*例:net.Socket - Transform Stream :読み書きのタイミングでデータ加工ができるストリーム
*例:zlib.createDeflate()
Streamが使用されている例
- process
stdin・stdout・stderr - fs
createReadStream・createWriteStream - net
socket - zlib
createGzip・createGunzip・createInflate・createDeflate - crypto
createCypheriv
Streamのイベントとよく使うメソッド
イベント | Readable | Writable | Duplex | Transform |
close | ○ | ○ | ○ | ○ |
error | ○ | ○ | ○ | ○ |
data | ○ | ○ | ○ | |
end | ○ | ○ | ○ | |
pause | ○ | ○ | ○ | |
resume | ○ | ○ | ○ | |
readable | ○ | ○ | ○ | |
drain | ○ | |||
finish | ○ | |||
pipe | ○ | |||
unpipe | ○ |
Streamの主なメソッド
- Writable
writable.end([chunk[, encoding]][, callback])
writable.write(chunk[, encoding][, callback]) - Readable
readable.pipe(destination[, options])
readable.read([size])
readable.resume()
readable.pause()
Streamを使ったファイルの読み書き
fsモジュールにファイルをStreamで扱う仕組みがあります
//戻り値:fs.ReadStream
fs.createReadStream(path[, options])
//戻り値:fs.WriteStream
fs.createWriteStream(path[, options])
sample.txtをStreamを利用して読み込む
読み込んだデータはdataイベント(1つデータを読み込んだときに発生するイベント)で取得します
データが最後に到達するとendイベント(全ての読み込みが完了すると発生するイベント)が発行されます
errorイベントはエラー発生時のイベントです
const fs = require("fs");
const fsReader = fs.createReadStream(__dirname + "/sample.txt", "utf8");
let data = "";
fsReader.on("data", (chunk) => {
data += chunk;
});
fsReader.on("end", ()=>{
console.log(data);
});
fsReader.on("error", (err)=>{
console.error(err);
});
読み込んだsample.txtをcopy.txtを作成書き込みます
pipe(stream) :次の処理(stream)に繋ぎます
const fs = require("fs");
const fsReader = fs.createReadStream(__dirname + "/sample.txt", "utf8");
const fsWriter = fs.createWriteStream(__dirname + "/copy.txt", "utf8");
fsReader.on("data", (chunk) => {
fsWriter.write(chunk)
});
//または
const fsReader = fs.createReadStream(__dirname + "/sample.txt", "utf8");
const fsWriter = fs.createWriteStream(__dirname + "/copy.txt", "utf8");
fsReader.pipe(fsWriter);
Buffer
Bufferはある場所から別の場所に転送されるデータのチャンク(小分けされたデータ)のための一時的な保管場所です
Bufferがデータで満たされると次に渡されます
*コンピュータに送信されたすべてのデータは結果を処理して出力する前に、バイナリ(0と1)に変換されます
次にエンコードして型を区別します(画像やビデオなどはバイナリデータです)
下記コードのchunkはBufferです
const fs = require("fs");
const fsReader = fs.createReadStream(__dirname + "/sample.txt");
fsReader.on("data", (chunk) => {
console.log('chunk\n', chunk)
});
//chunk
//<Buffer 0a 3c 73 76 67 20 78 6d 6c ...省略 more bytes>
//chunk
//<Buffer 3d 22 77 70 2d 62 6c 6f 63 ...省略 more bytes>
文字列→Buffer
文字列をBufferに書き込みます
buffer.write(変換したい文字列 [, encoding])
Buffer→文字列
buffer.toString([encoding]);
//process.stdinはReadable Stream
process.stdin.on('data', (chunk)=>{
console.log(chunk)
console.log(chunk.toString())
})
//hello(キーボード入力)
//<Buffer 68 65 6c 6c 6f 0a>
//hello
容量の大きいバイナリファイルを作成してみる
*1e9 は1×10の9乗 (10億)
*-e(コマンドの引数にワンライナープログラムを指定:ファイルを作成せずに実行できる)
*process.stdoutはWritable Stream
$ node -e "process.stdout.write(crypto.randomBytes(1e9))" > binary.file
Node.jsで用意されているエンコーディング
デフォルトは”utf8″
- ascii: ASCII文字列
- utf8 :UTF-8文字列
- utf16le :リトルエンディアンUTF-16(UTF-16LE)文字列
- ucs2 :utf16lfと同じ
- base64: BASE64でエンコードされた文字列
- binary :バイナリデータ(利用は推奨されない)
- hex :16進数で表記された文字列
mp4ファイル(ビデオ)の読み書き
const fs = require("fs");
const fsReader = fs.createReadStream(__dirname + "/sample.mp4")
const fsWriter = fs.createWriteStream(__dirname + "/copy.mp4")
fsReader.pipe(fsWriter).on('error', console.error);
http://localhost:3000にアクセスするとビデオが再々されます
*なんちゃってストリーミング配信^^;
const fs = require('fs');
const http = require('http');
http.createServer((req, res) => {
res.writeHeader(200, { 'Content-Type': 'video/mp4' });
fs.createReadStream( './sample.mp4')
.pipe(res)
.on('error', console.error);
}).listen(3000);
streamモジュール
Readableクラスを継承して配列を1つづつ読みこむカスタムストリームを作成
*クラス継承のメソッドには「_」が付きます
const { Readable } = require('stream');
//出力したい配列
const arrayList = [ "aa", "bb", "cc"];
//Readableクラスを継承
//配列の要素を1つづつ読みこむカスタムストリーム
class arrayReader extends Readable {
constructor(array){
super({encoding:'utf8'});
this.array = array;
this.index = 0
}
_read() {
if(this.index < this.array.length){
const chunk = `${this.array[this.index]}\n`
this.push(chunk);
this.index += 1;
}
}
}
const arrayStream = new arrayReader(arrayList);
// arrayStream.on('data', (chunk) => console.log(chunk));
// aa
// bb
// cc
//書き込む
const fs = require("fs");
const fsWriter = fs.createWriteStream(__dirname + "/copy.txt")
arrayStream.pipe(fsWriter)
オブジェクトモードについて
Node.jsのStreamは「文字列もしくはBufferオブジェクト」を操作するためのものです
それ以外の型で実装する場合はobjectModeオプションを使用してオブジェクトモードに切り替えます
const {Readable} = require('stream');
const arrayList = [ "aa", "bb", "cc"];
class arrayReader extends Readable {
constructor(array){
//オブジェクトモード
super({ objectMode: true});
this.array = array;
this.index = 0
}
_read() {
if(this.index < this.array.length){
//オブジェクトにする
const chunk = {
data: this.array[this.index],
index: this.index
}
this.push(chunk);
this.index += 1;
}
}
}
const arrayStream = new arrayReader(arrayList);
arrayStream.on('data', (chunk) => console.log(chunk));
//{ data: 'aa', index: 0 }
//{ data: 'bb', index: 1 }
//{ data: 'cc', index: 2 }
入力した文字を大文字にして出力する(データ加工)
const {Transform} = require('stream');
class toUpperCaseText extends Transform {
constructor(){
super();
}
_transform(chunk, encoding, callback){
this.push(chunk.toString().toUpperCase());
callback();
}
}
const toUpperCaseStream = new toUpperCaseText();
process.stdin.pipe(toUpperCaseStream).pipe(process.stdout);
//aaa
//AAA