스트림
소스 코드: lib/stream.js
스트림은 Node.js에서 스트리밍 데이터를 처리하기 위한 추상 인터페이스입니다. node:stream
모듈은 스트림 인터페이스를 구현하기 위한 API를 제공합니다.
Node.js에서 제공하는 스트림 객체는 많이 있습니다. 예를 들어, HTTP 서버에 대한 요청과 process.stdout
모두 스트림 인스턴스입니다.
스트림은 읽기 가능, 쓰기 가능 또는 둘 다일 수 있습니다. 모든 스트림은 EventEmitter
의 인스턴스입니다.
node:stream
모듈에 접근하려면 다음을 사용하십시오.
const stream = require('node:stream')
node:stream
모듈은 새로운 유형의 스트림 인스턴스를 만드는 데 유용합니다. 스트림을 사용하기 위해 node:stream
모듈을 사용할 필요는 일반적으로 없습니다.
이 문서의 구성
이 문서는 두 개의 주요 섹션과 주석을 위한 세 번째 섹션으로 구성되어 있습니다. 첫 번째 섹션에서는 애플리케이션 내에서 기존 스트림을 사용하는 방법을 설명합니다. 두 번째 섹션에서는 새로운 유형의 스트림을 만드는 방법을 설명합니다.
스트림 유형
Node.js에는 네 가지 기본 스트림 유형이 있습니다.
Writable
: 데이터를 쓸 수 있는 스트림 (예:fs.createWriteStream()
).Readable
: 데이터를 읽을 수 있는 스트림 (예:fs.createReadStream()
).Duplex
:Readable
및Writable
둘 다인 스트림 (예:net.Socket
).Transform
: 쓰기 및 읽기 시 데이터를 수정하거나 변환할 수 있는Duplex
스트림 (예:zlib.createDeflate()
).
또한, 이 모듈에는 유틸리티 함수 stream.duplexPair()
, stream.pipeline()
, stream.finished()
, stream.Readable.from()
및 stream.addAbortSignal()
이 포함되어 있습니다.
스트림 프로미스 API
추가된 버전: v15.0.0
stream/promises
API는 콜백을 사용하는 대신 Promise
객체를 반환하는 스트림에 대한 대체 비동기 유틸리티 함수 세트를 제공합니다. API는 require('node:stream/promises')
또는 require('node:stream').promises
를 통해 액세스할 수 있습니다.
stream.pipeline(source[, ...transforms], destination[, options])
stream.pipeline(streams[, options])
[기록]
버전 | 변경 사항 |
---|---|
v18.0.0, v17.2.0, v16.14.0 | 소스가 종료될 때 대상 스트림이 자동으로 닫히는 것을 방지하기 위해 end 옵션을 false 로 설정할 수 있습니다. |
v15.0.0 | 추가된 버전: v15.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- 반환 값: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- 반환 값: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- 반환 값: <Promise> | <AsyncIterable>
options
<Object> 파이프라인 옵션signal
<AbortSignal>end
<boolean> 소스 스트림이 종료될 때 대상 스트림을 종료합니다. 이 값이false
여도 변환 스트림은 항상 종료됩니다. 기본값:true
.
반환 값: <Promise> 파이프라인이 완료되면 이행됩니다.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
console.log('파이프라인이 성공했습니다.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('파이프라인이 성공했습니다.')
AbortSignal
을 사용하려면 마지막 인수로 옵션 객체 내부에 전달합니다. 신호가 중단되면 기본 파이프라인에서 AbortError
와 함께 destroy
가 호출됩니다.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
signal,
})
}
run().catch(console.error) // AbortError
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
console.error(err) // AbortError
}
pipeline
API는 비동기 생성기도 지원합니다.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // `Buffer` 대신 문자열로 작업합니다.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
fs.createWriteStream('uppercase.txt')
)
console.log('파이프라인이 성공했습니다.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // `Buffer` 대신 문자열로 작업합니다.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
createWriteStream('uppercase.txt')
)
console.log('파이프라인이 성공했습니다.')
비동기 생성기에 전달된 signal
인수를 처리해야 합니다. 특히 비동기 생성기가 파이프라인의 소스(즉, 첫 번째 인수)인 경우 또는 파이프라인이 절대 완료되지 않는 경우에 그렇습니다.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('파이프라인이 성공했습니다.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('파이프라인이 성공했습니다.')
pipeline
API는 콜백 버전을 제공합니다.
stream.finished(stream[, options])
[History]
Version | Changes |
---|---|
v19.5.0, v18.14.0 | ReadableStream 및 WritableStream 에 대한 지원이 추가되었습니다. |
v19.1.0, v18.13.0 | cleanup 옵션이 추가되었습니다. |
v15.0.0 | v15.0.0에 추가되었습니다. |
stream
<Stream> | <ReadableStream> | <WritableStream> 읽기 및/또는 쓰기 가능한 스트림/웹스트림입니다.options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
<AbortSignal> | <undefined>cleanup
<boolean> | <undefined>true
인 경우, 프로미스가 완료되기 전에 이 함수에 의해 등록된 리스너를 제거합니다. 기본값:false
.
반환: <Promise> 스트림이 더 이상 읽거나 쓸 수 없게 되면 완료됩니다.
const { finished } = require('node:stream/promises')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('스트림 읽기가 완료되었습니다.')
}
run().catch(console.error)
rs.resume() // 스트림을 드레인합니다.
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'
const rs = createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('스트림 읽기가 완료되었습니다.')
}
run().catch(console.error)
rs.resume() // 스트림을 드레인합니다.
finished
API는 콜백 버전도 제공합니다.
stream.finished()
는 반환된 프로미스가 확인되거나 거부된 후에도 매달린 이벤트 리스너(특히 'error'
, 'end'
, 'finish'
및 'close'
)를 남깁니다. 이는 예상치 못한 'error'
이벤트(잘못된 스트림 구현으로 인해)로 인해 예상치 못한 충돌이 발생하지 않도록 하기 위함입니다. 이것이 원치 않는 동작이라면 options.cleanup
을 true
로 설정해야 합니다.
await finished(rs, { cleanup: true })
객체 모드
Node.js API에서 생성된 모든 스트림은 문자열, <Buffer>, <TypedArray> 및 <DataView> 객체에서만 작동합니다.
문자열
과버퍼
는 스트림에서 가장 일반적으로 사용되는 유형입니다.TypedArray
및DataView
를 사용하면Int32Array
또는Uint8Array
와 같은 유형으로 바이너리 데이터를 처리할 수 있습니다. TypedArray 또는 DataView를 스트림에 쓰면 Node.js가 원시 바이트를 처리합니다.
그러나 스트림 구현에서 다른 유형의 JavaScript 값(스트림 내에서 특별한 용도로 사용되는 null
제외)을 사용할 수 있습니다. 이러한 스트림은 "객체 모드"로 작동하는 것으로 간주됩니다.
스트림 인스턴스는 스트림을 생성할 때 objectMode
옵션을 사용하여 객체 모드로 전환됩니다. 기존 스트림을 객체 모드로 전환하려는 시도는 안전하지 않습니다.
버퍼링
Writable
및 Readable
스트림 모두 내부 버퍼에 데이터를 저장합니다.
잠재적으로 버퍼링되는 데이터의 양은 스트림의 생성자에 전달된 highWaterMark
옵션에 따라 달라집니다. 일반 스트림의 경우 highWaterMark
옵션은 총 바이트 수를 지정합니다. 객체 모드로 작동하는 스트림의 경우 highWaterMark
는 총 객체 수를 지정합니다. (디코딩하지 않는) 문자열에서 작동하는 스트림의 경우 highWaterMark
는 총 UTF-16 코드 단위 수를 지정합니다.
데이터는 구현에서 stream.push(chunk)
를 호출할 때 Readable
스트림에 버퍼링됩니다. 스트림의 소비자가 stream.read()
를 호출하지 않으면 데이터는 소비될 때까지 내부 큐에 남아 있습니다.
내부 읽기 버퍼의 총 크기가 highWaterMark
에 지정된 임계값에 도달하면 현재 버퍼링된 데이터를 소비할 수 있을 때까지 스트림은 기본 리소스에서 데이터 읽기를 일시적으로 중지합니다(즉, 스트림은 읽기 버퍼를 채우는 데 사용되는 내부 readable._read()
메서드 호출을 중지합니다).
데이터는 writable.write(chunk)
메서드가 반복적으로 호출될 때 Writable
스트림에 버퍼링됩니다. 내부 쓰기 버퍼의 총 크기가 highWaterMark
에서 설정한 임계값 미만인 동안 writable.write()
에 대한 호출은 true
를 반환합니다. 내부 버퍼의 크기가 highWaterMark
에 도달하거나 초과하면 false
가 반환됩니다.
stream
API, 특히 stream.pipe()
메서드의 주요 목표는 서로 다른 속도의 소스와 대상이 사용 가능한 메모리를 압도하지 않도록 데이터를 허용 가능한 수준으로 버퍼링하는 것을 제한하는 것입니다.
highWaterMark
옵션은 임계값이지 제한이 아닙니다. 스트림이 더 많은 데이터를 요청하기 전에 버퍼링하는 데이터의 양을 지정합니다. 일반적으로 엄격한 메모리 제한을 적용하지 않습니다. 특정 스트림 구현은 더 엄격한 제한을 적용하도록 선택할 수 있지만 이는 선택 사항입니다.
Duplex
및 Transform
스트림은 모두 Readable
및 Writable
이므로 읽기 및 쓰기에 사용되는 두 개의 별도 내부 버퍼를 유지하여 각 측면이 적절하고 효율적인 데이터 흐름을 유지하면서 서로 독립적으로 작동할 수 있습니다. 예를 들어, net.Socket
인스턴스는 소켓에서 수신된 데이터의 소비를 허용하는 Readable
측면과 소켓에 데이터를 쓰기를 허용하는 Writable
측면을 가진 Duplex
스트림입니다. 데이터가 수신되는 속도보다 빠르거나 느린 속도로 소켓에 쓸 수 있기 때문에 각 측면은 서로 독립적으로 작동(및 버퍼링)해야 합니다.
내부 버퍼링의 메커니즘은 내부 구현 세부 사항이며 언제든지 변경될 수 있습니다. 그러나 특정 고급 구현의 경우 writable.writableBuffer
또는 readable.readableBuffer
를 사용하여 내부 버퍼를 검색할 수 있습니다. 문서화되지 않은 이러한 속성의 사용은 권장되지 않습니다.
스트림 소비자용 API
거의 모든 Node.js 애플리케이션은 아무리 간단하더라도 어떤 방식으로든 스트림을 사용합니다. 다음은 HTTP 서버를 구현하는 Node.js 애플리케이션에서 스트림을 사용하는 예입니다.
const http = require('node:http')
const server = http.createServer((req, res) => {
// `req`는 읽기 가능한 스트림인 http.IncomingMessage입니다.
// `res`는 쓰기 가능한 스트림인 http.ServerResponse입니다.
let body = ''
// 데이터를 utf8 문자열로 가져옵니다.
// 인코딩이 설정되지 않으면 Buffer 객체가 수신됩니다.
req.setEncoding('utf8')
// 읽기 가능한 스트림은 리스너가 추가되면 'data' 이벤트를 발생시킵니다.
req.on('data', chunk => {
body += chunk
})
// 'end' 이벤트는 전체 본문을 수신했음을 나타냅니다.
req.on('end', () => {
try {
const data = JSON.parse(body)
// 사용자에게 흥미로운 내용을 다시 씁니다.
res.write(typeof data)
res.end()
} catch (er) {
// 이런! 잘못된 json!
res.statusCode = 400
return res.end(`오류: ${er.message}`)
}
})
})
server.listen(1337)
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// 오류: 예기치 않은 토큰 'o', "not json"은 유효한 JSON이 아닙니다.
Writable
스트림(예시의 res
와 같은)은 스트림에 데이터를 쓰는 데 사용되는 write()
및 end()
와 같은 메서드를 노출합니다.
Readable
스트림은 스트림에서 읽을 수 있는 데이터가 있을 때 애플리케이션 코드에 알리기 위해 EventEmitter
API를 사용합니다. 사용 가능한 데이터는 여러 가지 방법으로 스트림에서 읽을 수 있습니다.
Writable
및 Readable
스트림 모두 EventEmitter
API를 다양한 방식으로 사용하여 스트림의 현재 상태를 전달합니다.
Duplex
및 Transform
스트림은 모두 Writable
및 Readable
입니다.
스트림에 데이터를 쓰거나 스트림에서 데이터를 소비하는 애플리케이션은 스트림 인터페이스를 직접 구현할 필요가 없으며 일반적으로 require('node:stream')
을 호출할 이유가 없습니다.
새로운 유형의 스트림을 구현하려는 개발자는 스트림 구현자용 API 섹션을 참조해야 합니다.
쓰기 가능 스트림
쓰기 가능 스트림은 데이터를 쓰는 대상에 대한 추상화입니다.
Writable
스트림의 예는 다음과 같습니다.
- 클라이언트의 HTTP 요청
- 서버의 HTTP 응답
- fs 쓰기 스트림
- zlib 스트림
- crypto 스트림
- TCP 소켓
- 자식 프로세스 stdin
process.stdout
,process.stderr
이러한 예 중 일부는 실제로 Writable
인터페이스를 구현하는 Duplex
스트림입니다.
모든 Writable
스트림은 stream.Writable
클래스에 의해 정의된 인터페이스를 구현합니다.
Writable
스트림의 특정 인스턴스는 다양한 방식으로 다를 수 있지만, 모든 Writable
스트림은 아래 예에서 설명된 것과 동일한 기본 사용 패턴을 따릅니다.
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')
클래스: stream.Writable
추가된 버전: v0.9.4
이벤트: 'close'
[히스토리]
버전 | 변경 사항 |
---|---|
v10.0.0 | destroy 시 'close' 가 방출되는지 여부를 지정하는 emitClose 옵션 추가. |
v0.9.4 | 추가된 버전: v0.9.4 |
스트림과 기본 리소스 (예: 파일 디스크립터)가 닫히면 'close'
이벤트가 방출됩니다. 이 이벤트는 더 이상 이벤트가 방출되지 않고 더 이상 계산이 발생하지 않음을 나타냅니다.
Writable
스트림은 emitClose
옵션으로 생성된 경우 항상 'close'
이벤트를 방출합니다.
이벤트: 'drain'
추가된 버전: v0.9.4
stream.write(chunk)
호출이 false
를 반환하면, 스트림에 데이터 쓰기를 재개하는 것이 적절할 때 'drain'
이벤트가 방출됩니다.
// 제공된 쓰기 가능 스트림에 데이터를 백만 번 씁니다.
// 백 프레셔에 주의하십시오.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000
write()
function write() {
let ok = true
do {
i--
if (i === 0) {
// 마지막!
writer.write(data, encoding, callback)
} else {
// 계속해야 할지, 기다려야 할지 확인합니다.
// 아직 완료되지 않았으므로 콜백을 전달하지 않습니다.
ok = writer.write(data, encoding)
}
} while (i > 0 && ok)
if (i > 0) {
// 일찍 중단해야 했습니다!
// 드레인되면 더 씁니다.
writer.once('drain', write)
}
}
}
Event: 'error'
추가된 버전: v0.9.4
'error'
이벤트는 데이터를 쓰거나 파이핑하는 동안 오류가 발생한 경우에 발생합니다. 리스너 콜백은 호출될 때 단일 Error
인수를 전달받습니다.
스트림을 생성할 때 autoDestroy
옵션이 false
로 설정되지 않은 경우, 'error'
이벤트가 발생하면 스트림이 닫힙니다.
'error'
이후에는 'close'
이외의 다른 이벤트 ( 'error'
이벤트 포함)는 발생해서는 안 됩니다.
Event: 'finish'
추가된 버전: v0.9.4
'finish'
이벤트는 stream.end()
메서드가 호출되고 모든 데이터가 기본 시스템으로 플러시된 후에 발생합니다.
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
console.log('모든 쓰기가 완료되었습니다.')
})
writer.end('이것이 끝입니다\n')
Event: 'pipe'
추가된 버전: v0.9.4
src
<stream.Readable> 이 쓰기 가능 스트림으로 파이핑되는 소스 스트림
'pipe'
이벤트는 읽기 가능 스트림에서 stream.pipe()
메서드가 호출될 때 발생하여 이 쓰기 가능 스트림을 대상 집합에 추가합니다.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
console.log('무언가가 쓰기 스트림으로 파이핑되고 있습니다.')
assert.equal(src, reader)
})
reader.pipe(writer)
Event: 'unpipe'
추가된 버전: v0.9.4
src
<stream.Readable> 이 쓰기 가능 스트림에서 언파이프된 소스 스트림
'unpipe'
이벤트는 Readable
스트림에서 stream.unpipe()
메서드가 호출되어 대상 집합에서 이 Writable
스트림을 제거할 때 발생합니다.
또한 Readable
스트림이 파이핑될 때 이 Writable
스트림에서 오류가 발생한 경우에도 발생합니다.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
console.log('무언가가 쓰기 스트림으로 파이핑되는 것을 중단했습니다.')
assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()
추가된 버전: v0.11.2
writable.cork()
메서드는 작성된 모든 데이터를 메모리에 버퍼링하도록 강제합니다. 버퍼링된 데이터는 stream.uncork()
또는 stream.end()
메서드가 호출될 때 플러시됩니다.
writable.cork()
의 주요 목적은 여러 개의 작은 청크가 빠른 속도로 스트림에 작성되는 상황을 수용하는 것입니다. 즉시 기본 대상으로 전달하는 대신 writable.cork()
는 writable.uncork()
가 호출될 때까지 모든 청크를 버퍼링합니다. 그러면 모든 청크를 writable._writev()
(있는 경우)로 전달합니다. 이렇게 하면 첫 번째 작은 청크가 처리될 때까지 데이터가 버퍼링되는 head-of-line 차단 상황을 방지합니다. 그러나 writable._writev()
를 구현하지 않고 writable.cork()
를 사용하면 처리량에 부정적인 영향을 미칠 수 있습니다.
참고: writable.uncork()
, writable._writev()
.
writable.destroy([error])
[기록]
버전 | 변경 사항 |
---|---|
v14.0.0 | 이미 소멸된 스트림에서 no-op으로 작동합니다. |
v8.0.0 | 추가된 버전: v8.0.0 |
스트림을 소멸시킵니다. 선택적으로 'error'
이벤트를 내보내고 'close'
이벤트를 내보냅니다 (단, emitClose
가 false
로 설정되지 않은 경우). 이 호출 후에는 쓰기 가능한 스트림이 종료되고 write()
또는 end()
에 대한 후속 호출은 ERR_STREAM_DESTROYED
오류를 발생시킵니다. 이는 스트림을 소멸시키는 파괴적이고 즉각적인 방법입니다. write()
에 대한 이전 호출이 드레인되지 않았을 수 있으며 ERR_STREAM_DESTROYED
오류를 트리거할 수 있습니다. 닫기 전에 데이터를 플러시해야 하는 경우 destroy 대신 end()
를 사용하거나 스트림을 소멸시키기 전에 'drain'
이벤트를 기다립니다.
const { Writable } = require('node:stream')
const myStream = new Writable()
const fooErr = new Error('foo 오류')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo 오류
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.on('error', function wontHappen() {})
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED
destroy()
가 호출된 후에는 추가 호출이 no-op이 되고 _destroy()
에서 발생한 오류를 제외하고 'error'
로 더 이상 오류가 내보내지지 않습니다.
구현자는 이 메서드를 재정의해서는 안 되며 대신 writable._destroy()
를 구현해야 합니다.
writable.closed
추가된 버전: v18.0.0
'close'
가 발생한 후 true
입니다.
writable.destroyed
추가된 버전: v8.0.0
writable.destroy()
가 호출된 후 true
입니다.
const { Writable } = require('node:stream')
const myStream = new Writable()
console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])
[내역]
버전 | 변경 사항 |
---|---|
v22.0.0, v20.13.0 | chunk 인수가 이제 TypedArray 또는 DataView 인스턴스일 수 있습니다. |
v15.0.0 | callback 은 'finish' 전이나 오류 시 호출됩니다. |
v14.0.0 | 'finish' 또는 'error'가 발생하면 callback 이 호출됩니다. |
v10.0.0 | 이 메서드는 이제 writable 에 대한 참조를 반환합니다. |
v8.0.0 | chunk 인수가 이제 Uint8Array 인스턴스일 수 있습니다. |
v0.9.4 | 추가된 버전: v0.9.4 |
chunk
<string>
|<Buffer>
|<TypedArray>
|<DataView>
|<any>
쓸 선택적 데이터입니다. 객체 모드로 작동하지 않는 스트림의 경우,chunk
는<string>
,<Buffer>
,<TypedArray>
또는<DataView>
여야 합니다. 객체 모드 스트림의 경우,chunk
는null
이 아닌 모든 JavaScript 값일 수 있습니다.encoding
<string>
chunk
가 문자열인 경우 인코딩callback
<Function>
스트림이 완료되었을 때의 콜백- 반환값:
<this>
writable.end()
메서드를 호출하면 더 이상 데이터를 Writable
에 쓰지 않음을 알립니다. 선택적 chunk
및 encoding
인수를 사용하면 스트림을 닫기 직전에 마지막 데이터 청크를 추가로 쓸 수 있습니다.
stream.end()
를 호출한 후 stream.write()
메서드를 호출하면 오류가 발생합니다.
// 'hello, '를 쓰고 'world!'로 끝냅니다.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// 이제 더 이상 쓸 수 없습니다!
writable.setDefaultEncoding(encoding)
[History]
Version | Changes |
---|---|
v6.1.0 | 이 메서드는 이제 writable 에 대한 참조를 반환합니다. |
v0.11.15 | 추가됨: v0.11.15 |
writable.setDefaultEncoding()
메서드는 Writable
스트림의 기본 encoding
을 설정합니다.
writable.uncork()
추가됨: v0.11.2
writable.uncork()
메서드는 stream.cork()
가 호출된 이후 버퍼링된 모든 데이터를 플러시합니다.
writable.cork()
및 writable.uncork()
를 사용하여 스트림에 대한 쓰기 버퍼링을 관리할 때 process.nextTick()
을 사용하여 writable.uncork()
에 대한 호출을 지연합니다. 이렇게 하면 주어진 Node.js 이벤트 루프 단계 내에서 발생하는 모든 writable.write()
호출의 일괄 처리가 가능합니다.
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())
스트림에서 writable.cork()
메서드가 여러 번 호출되면 버퍼링된 데이터를 플러시하려면 writable.uncork()
를 동일한 횟수만큼 호출해야 합니다.
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
stream.uncork()
// uncork()가 두 번째로 호출될 때까지 데이터가 플러시되지 않습니다.
stream.uncork()
})
참고: writable.cork()
.
writable.writable
추가됨: v11.4.0
writable.write()
를 호출해도 안전한 경우 true
입니다. 이는 스트림이 파괴, 오류 또는 종료되지 않았음을 의미합니다.
writable.writableAborted
추가됨: v18.0.0, v16.17.0
스트림이 'finish'
를 내보내기 전에 파괴되었거나 오류가 발생했는지 여부를 반환합니다.
writable.writableEnded
추가된 버전: v12.9.0
writable.end()
가 호출된 후 true
가 됩니다. 이 속성은 데이터가 플러시되었는지 여부를 나타내지 않습니다. 대신 writable.writableFinished
를 사용하세요.
writable.writableCorked
추가된 버전: v13.2.0, v12.16.0
스트림의 코르크를 완전히 해제하기 위해 writable.uncork()
를 호출해야 하는 횟수입니다.
writable.errored
추가된 버전: v18.0.0
스트림이 오류와 함께 파괴된 경우 오류를 반환합니다.
writable.writableFinished
추가된 버전: v12.6.0
'finish'
이벤트가 발생하기 직전에 true
로 설정됩니다.
writable.writableHighWaterMark
추가된 버전: v9.3.0
이 Writable
을 만들 때 전달된 highWaterMark
의 값을 반환합니다.
writable.writableLength
추가된 버전: v9.4.0
이 속성에는 쓰기 준비가 된 큐의 바이트 수(또는 객체 수)가 포함됩니다. 이 값은 highWaterMark
의 상태에 대한 내부 정보를 제공합니다.
writable.writableNeedDrain
추가된 버전: v15.2.0, v14.17.0
스트림의 버퍼가 가득 차서 스트림이 'drain'
을 발생시킬 경우 true
입니다.
writable.writableObjectMode
추가된 버전: v12.3.0
주어진 Writable
스트림의 objectMode
속성에 대한 getter입니다.
writable[Symbol.asyncDispose]()
추가된 버전: v22.4.0, v20.16.0
AbortError
와 함께 writable.destroy()
를 호출하고 스트림이 완료될 때 이행되는 Promise를 반환합니다.
writable.write(chunk[, encoding][, callback])
[기록]
버전 | 변경 사항 |
---|---|
v22.0.0, v20.13.0 | chunk 인수가 이제 TypedArray 또는 DataView 인스턴스가 될 수 있습니다. |
v8.0.0 | chunk 인수가 이제 Uint8Array 인스턴스가 될 수 있습니다. |
v6.0.0 | chunk 매개변수로 null 을 전달하면 이제 객체 모드에서도 항상 유효하지 않은 것으로 간주됩니다. |
v0.9.4 | 추가된 버전: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> 작성할 선택적 데이터. 객체 모드로 작동하지 않는 스트림의 경우,chunk
는 <string>, <Buffer>, <TypedArray> 또는 <DataView>여야 합니다. 객체 모드 스트림의 경우,chunk
는null
이외의 모든 JavaScript 값이 될 수 있습니다.encoding
<string> | <null>chunk
가 문자열인 경우의 인코딩. 기본값:'utf8'
callback
<Function> 이 데이터 청크가 플러시될 때의 콜백.- 반환값: <boolean> 스트림이 추가 데이터를 계속 작성하기 전에
'drain'
이벤트가 발생하기를 호출 코드가 기다리기를 원하는 경우false
; 그렇지 않으면true
.
writable.write()
메서드는 스트림에 일부 데이터를 쓰고, 데이터가 완전히 처리되면 제공된 callback
을 호출합니다. 오류가 발생하면 callback
은 오류를 첫 번째 인수로 사용하여 호출됩니다. callback
은 비동기적으로 호출되며 'error'
가 발생하기 전에 호출됩니다.
반환 값은 chunk
를 수용한 후 내부 버퍼가 스트림이 생성될 때 구성된 highWaterMark
보다 작으면 true
입니다. false
가 반환되면, 'drain'
이벤트가 발생할 때까지 스트림에 데이터를 더 쓰려는 시도를 중지해야 합니다.
스트림이 배수되지 않는 동안 write()
호출은 chunk
를 버퍼링하고 false를 반환합니다. 현재 버퍼링된 모든 청크가 배수되면 (운영 체제에서 전달을 위해 수락됨) 'drain'
이벤트가 발생합니다. write()
가 false를 반환한 후에는 'drain'
이벤트가 발생할 때까지 더 많은 청크를 쓰지 마십시오. 배수되지 않는 스트림에서 write()
를 호출하는 것이 허용되지만, Node.js는 최대 메모리 사용량이 발생할 때까지 작성된 모든 청크를 버퍼링하고, 그 시점에서 무조건적으로 중단합니다. 중단하기 전에도 높은 메모리 사용량은 가비지 수집기의 성능 저하와 높은 RSS (메모리가 더 이상 필요하지 않은 후에도 일반적으로 시스템으로 반환되지 않음)를 유발합니다. TCP 소켓은 원격 피어가 데이터를 읽지 않으면 배수되지 않을 수 있으므로, 배수되지 않는 소켓을 작성하는 것은 원격으로 악용 가능한 취약점으로 이어질 수 있습니다.
스트림이 배수되지 않는 동안 데이터를 쓰는 것은 Transform
스트림이 파이핑되거나 'data'
또는 'readable'
이벤트 핸들러가 추가될 때까지 기본적으로 일시 중지되기 때문에 특히 문제가 됩니다.
작성할 데이터를 필요에 따라 생성하거나 가져올 수 있는 경우, 로직을 Readable
로 캡슐화하고 stream.pipe()
를 사용하는 것이 좋습니다. 그러나 write()
를 호출하는 것이 선호되는 경우, 'drain'
이벤트를 사용하여 백프레셔를 존중하고 메모리 문제를 피할 수 있습니다.
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb)
} else {
process.nextTick(cb)
}
}
// 다른 쓰기를 수행하기 전에 cb가 호출될 때까지 기다립니다.
write('hello', () => {
console.log('쓰기 완료, 이제 더 많은 쓰기를 수행합니다.')
})
객체 모드의 Writable
스트림은 항상 encoding
인수를 무시합니다.
읽기 가능한 스트림
읽기 가능한 스트림은 데이터를 소비하는 소스에 대한 추상화입니다.
Readable
스트림의 예는 다음과 같습니다:
모든 Readable
스트림은 stream.Readable
클래스에 의해 정의된 인터페이스를 구현합니다.
두 가지 읽기 모드
Readable
스트림은 효과적으로 흐름 및 일시 정지의 두 가지 모드 중 하나로 작동합니다. 이러한 모드는 객체 모드와는 별개입니다. Readable
스트림은 흐름 모드인지 일시 정지 모드인지에 관계없이 객체 모드일 수도 있고 아닐 수도 있습니다.
- 흐름 모드에서는 기본 시스템에서 데이터가 자동으로 읽혀지고
EventEmitter
인터페이스를 통해 이벤트를 사용하여 가능한 한 빨리 애플리케이션에 제공됩니다. - 일시 정지 모드에서는 스트림에서 데이터 청크를 읽기 위해
stream.read()
메서드를 명시적으로 호출해야 합니다.
모든 Readable
스트림은 일시 정지 모드로 시작되지만 다음 방법 중 하나로 흐름 모드로 전환할 수 있습니다:
'data'
이벤트 핸들러 추가.stream.resume()
메서드 호출.stream.pipe()
메서드를 호출하여 데이터를Writable
로 보냅니다.
Readable
은 다음 중 하나를 사용하여 일시 정지 모드로 다시 전환할 수 있습니다.
- 파이프 대상이 없는 경우,
stream.pause()
메서드를 호출합니다. - 파이프 대상이 있는 경우, 모든 파이프 대상을 제거합니다. 여러 파이프 대상은
stream.unpipe()
메서드를 호출하여 제거할 수 있습니다.
기억해야 할 중요한 개념은 Readable
은 데이터를 소비하거나 무시하기 위한 메커니즘이 제공될 때까지 데이터를 생성하지 않는다는 것입니다. 소비 메커니즘이 비활성화되거나 제거되면 Readable
은 데이터 생성을 중단하려고 시도합니다.
이전 버전과의 호환성 때문에 'data'
이벤트 핸들러를 제거해도 스트림이 자동으로 일시 정지되지는 않습니다. 또한 파이프된 대상이 있는 경우 stream.pause()
를 호출해도 해당 대상이 드레인되고 더 많은 데이터를 요청하면 스트림이 계속 일시 정지된 상태를 유지하는 것은 보장하지 않습니다.
Readable
이 흐름 모드로 전환되었고 데이터를 처리할 수 있는 소비자가 없으면 해당 데이터는 손실됩니다. 예를 들어, 'data'
이벤트에 연결된 리스너 없이 readable.resume()
메서드가 호출되거나 스트림에서 'data'
이벤트 핸들러가 제거될 때 발생할 수 있습니다.
'readable'
이벤트 핸들러를 추가하면 스트림 흐름이 자동으로 중지되고, 데이터는 readable.read()
를 통해 소비해야 합니다. 'readable'
이벤트 핸들러가 제거되면 'data'
이벤트 핸들러가 있는 경우 스트림이 다시 흐름을 시작합니다.
세 가지 상태
Readable
스트림의 "두 가지 모드" 작동 방식은 Readable
스트림 구현 내부에서 발생하는 보다 복잡한 내부 상태 관리를 단순화한 추상화입니다.
특히, 주어진 시점에서 모든 Readable
은 세 가지 가능한 상태 중 하나에 있습니다.
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
readable.readableFlowing
이 null
일 때는 스트림의 데이터를 소비하기 위한 메커니즘이 제공되지 않습니다. 따라서 스트림은 데이터를 생성하지 않습니다. 이 상태에서 'data'
이벤트에 대한 리스너를 첨부하거나, readable.pipe()
메서드를 호출하거나, readable.resume()
메서드를 호출하면 readable.readableFlowing
이 true
로 전환되어 Readable
이 데이터가 생성될 때 활발하게 이벤트를 방출하기 시작합니다.
readable.pause()
, readable.unpipe()
를 호출하거나 백프레셔를 받으면 readable.readableFlowing
이 false
로 설정되어 이벤트 흐름이 일시적으로 중단되지만 데이터 생성이 중단되지는 않습니다. 이 상태에서는 'data'
이벤트에 대한 리스너를 첨부해도 readable.readableFlowing
이 true
로 전환되지 않습니다.
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()
pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing은 이제 false입니다.
pass.on('data', chunk => {
console.log(chunk.toString())
})
// readableFlowing은 여전히 false입니다.
pass.write('ok') // 'data'를 방출하지 않습니다.
pass.resume() // 스트림이 'data'를 방출하려면 호출해야 합니다.
// readableFlowing은 이제 true입니다.
readable.readableFlowing
이 false
인 동안 데이터는 스트림의 내부 버퍼에 축적될 수 있습니다.
하나의 API 스타일 선택
Readable
스트림 API는 여러 Node.js 버전에서 발전했으며 스트림 데이터를 소비하는 여러 가지 방법을 제공합니다. 일반적으로 개발자는 데이터 소비 방법 중 하나를 선택해야 하며 단일 스트림에서 데이터를 소비하기 위해 여러 방법을 절대 사용해서는 안 됩니다. 특히, on('data')
, on('readable')
, pipe()
, 또는 비동기 반복자를 조합하여 사용하면 직관적이지 않은 동작이 발생할 수 있습니다.
클래스: stream.Readable
추가된 버전: v0.9.4
이벤트: 'close'
[기록]
버전 | 변경 사항 |
---|---|
v10.0.0 | destroy 시 'close' 가 발생할지 여부를 지정하는 emitClose 옵션 추가. |
v0.9.4 | 추가된 버전: v0.9.4 |
'close'
이벤트는 스트림과 기본 리소스(예: 파일 디스크립터)가 닫혔을 때 발생합니다. 이 이벤트는 더 이상 이벤트가 발생하지 않으며 더 이상의 계산이 수행되지 않음을 나타냅니다.
Readable
스트림은 emitClose
옵션으로 생성된 경우 항상 'close'
이벤트를 발생시킵니다.
이벤트: 'data'
추가된 버전: v0.9.4
chunk
<Buffer> | <string> | <any> 데이터 청크. 객체 모드에서 작동하지 않는 스트림의 경우, 청크는 문자열 또는Buffer
입니다. 객체 모드에 있는 스트림의 경우, 청크는null
이 아닌 모든 JavaScript 값일 수 있습니다.
'data'
이벤트는 스트림이 데이터 청크의 소유권을 소비자에게 양도할 때마다 발생합니다. 이는 readable.pipe()
, readable.resume()
을 호출하거나 'data'
이벤트에 리스너 콜백을 연결하여 스트림이 흐름 모드로 전환될 때마다 발생할 수 있습니다. 'data'
이벤트는 readable.read()
메서드가 호출되고 반환할 데이터 청크가 있는 경우에도 발생합니다.
명시적으로 일시 중지되지 않은 스트림에 'data'
이벤트 리스너를 연결하면 스트림이 흐름 모드로 전환됩니다. 그러면 데이터가 사용 가능해지자마자 전달됩니다.
리스너 콜백은 readable.setEncoding()
메서드를 사용하여 스트림에 기본 인코딩이 지정된 경우 데이터 청크를 문자열로 전달합니다. 그렇지 않으면 데이터가 Buffer
로 전달됩니다.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Received ${chunk.length} bytes of data.`)
})
이벤트: 'end'
추가된 버전: v0.9.4
'end'
이벤트는 스트림에서 더 이상 소비할 데이터가 없을 때 발생합니다.
데이터가 완전히 소비되지 않으면 'end'
이벤트는 발생하지 않습니다. 이는 스트림을 플로잉 모드로 전환하거나 모든 데이터가 소비될 때까지 stream.read()
를 반복적으로 호출하여 달성할 수 있습니다.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Received ${chunk.length} bytes of data.`)
})
readable.on('end', () => {
console.log('There will be no more data.')
})
이벤트: 'error'
추가된 버전: v0.9.4
'error'
이벤트는 언제든지 Readable
구현에 의해 발생할 수 있습니다. 일반적으로 기본 스트림이 내부 오류로 인해 데이터를 생성할 수 없거나 스트림 구현이 잘못된 데이터 청크를 푸시하려고 할 때 발생할 수 있습니다.
리스너 콜백에는 단일 Error
객체가 전달됩니다.
이벤트: 'pause'
추가된 버전: v0.9.4
'pause'
이벤트는 stream.pause()
가 호출되고 readableFlowing
이 false
가 아닐 때 발생합니다.
이벤트: 'readable'
[기록]
버전 | 변경 사항 |
---|---|
v10.0.0 | 'readable' 은 .push() 가 호출된 후 항상 다음 틱에서 발생합니다. |
v10.0.0 | 'readable' 을 사용하려면 .read() 를 호출해야 합니다. |
v0.9.4 | 추가된 버전: v0.9.4 |
'readable'
이벤트는 스트림에서 읽을 수 있는 데이터가 구성된 고수위 마크(state.highWaterMark
)까지 사용 가능할 때 발생합니다. 효과적으로 스트림에 버퍼 내에 새로운 정보가 있음을 나타냅니다. 이 버퍼 내에 데이터가 있는 경우 stream.read()
를 호출하여 해당 데이터를 검색할 수 있습니다. 또한 스트림의 끝에 도달했을 때도 'readable'
이벤트가 발생할 수 있습니다.
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
// 이제 읽을 데이터가 있습니다.
let data
while ((data = this.read()) !== null) {
console.log(data)
}
})
스트림의 끝에 도달한 경우 stream.read()
를 호출하면 null
이 반환되고 'end'
이벤트가 트리거됩니다. 이는 읽을 데이터가 전혀 없는 경우에도 마찬가지입니다. 예를 들어 다음 예에서 foo.txt
는 빈 파일입니다.
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
console.log('end')
})
이 스크립트를 실행한 결과는 다음과 같습니다.
$ node test.js
readable: null
end
경우에 따라 'readable'
이벤트에 대한 리스너를 연결하면 내부 버퍼로 일정량의 데이터가 읽힙니다.
일반적으로 readable.pipe()
및 'data'
이벤트 메커니즘은 'readable'
이벤트보다 이해하기 쉽습니다. 그러나 'readable'
을 처리하면 처리량이 증가할 수 있습니다.
'readable'
및 'data'
가 동시에 사용되는 경우 'readable'
이 흐름 제어에서 우선 순위를 갖습니다. 즉, stream.read()
가 호출된 경우에만 'data'
가 발생합니다. readableFlowing
속성은 false
가 됩니다. 'readable'
이 제거될 때 'data'
리스너가 있으면 스트림이 흐르기 시작합니다. 즉, .resume()
을 호출하지 않고도 'data'
이벤트가 발생합니다.
Event: 'resume'
추가된 버전: v0.9.4
'resume'
이벤트는 stream.resume()
가 호출되었고 readableFlowing
이 true
가 아닐 때 발생합니다.
readable.destroy([error])
[기록]
버전 | 변경 사항 |
---|---|
v14.0.0 | 이미 파괴된 스트림에서 아무 작업도 하지 않도록 동작합니다. |
v8.0.0 | 추가된 버전: v8.0.0 |
스트림을 파괴합니다. 선택적으로 'error'
이벤트를 발생시키고, 'close'
이벤트도 발생시킵니다 (emitClose
가 false
로 설정되지 않은 경우). 이 호출 후, 읽을 수 있는 스트림은 내부 리소스를 해제하고 이후 push()
호출은 무시됩니다.
destroy()
가 호출된 후에는 추가 호출이 아무 작업을 하지 않으며 _destroy()
에서 발생하는 오류 외에는 더 이상 'error'
로 발생하지 않습니다.
구현자는 이 메서드를 재정의해서는 안 되며 대신 readable._destroy()
를 구현해야 합니다.
readable.closed
추가된 버전: v18.0.0
'close'
가 발생한 후에는 true
입니다.
readable.destroyed
추가된 버전: v8.0.0
readable.destroy()
가 호출된 후에는 true
입니다.
readable.isPaused()
추가된 버전: v0.11.14
- 반환 값: <boolean>
readable.isPaused()
메서드는 Readable
의 현재 작동 상태를 반환합니다. 이는 주로 readable.pipe()
메서드의 기본 메커니즘에 의해 사용됩니다. 대부분의 일반적인 경우에 이 메서드를 직접 사용할 필요는 없습니다.
const readable = new stream.Readable()
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()
추가된 버전: v0.9.4
- 반환값: <<this>>
readable.pause()
메서드는 플로잉 모드의 스트림이 'data'
이벤트를 내보내는 것을 중지시켜 플로잉 모드에서 벗어나도록 합니다. 사용 가능한 모든 데이터는 내부 버퍼에 유지됩니다.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`데이터 ${chunk.length}바이트 수신됨.`)
readable.pause()
console.log('1초 동안 추가 데이터가 없습니다.')
setTimeout(() => {
console.log('이제 데이터가 다시 흐르기 시작합니다.')
readable.resume()
}, 1000)
})
'readable'
이벤트 리스너가 있는 경우 readable.pause()
메서드는 아무런 효과가 없습니다.
readable.pipe(destination[, options])
추가된 버전: v0.9.4
destination
<stream.Writable> 데이터를 쓸 대상options
<Object> 파이프 옵션end
<boolean> 읽기 스트림이 끝나면 쓰기 스트림을 종료합니다. 기본값:true
.
반환값: <stream.Writable> 대상 스트림.
Duplex
또는Transform
스트림인 경우 파이프 체인을 구성할 수 있습니다.
readable.pipe()
메서드는 Writable
스트림을 readable
스트림에 연결하여 플로잉 모드로 자동 전환하고 모든 데이터를 연결된 Writable
스트림으로 푸시합니다. 데이터 흐름은 대상 Writable
스트림이 더 빠른 Readable
스트림에 압도되지 않도록 자동으로 관리됩니다.
다음 예제에서는 readable
스트림의 모든 데이터를 file.txt
라는 파일로 파이프합니다.
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// readable 스트림의 모든 데이터가 'file.txt'로 이동합니다.
readable.pipe(writable)
단일 Readable
스트림에 여러 Writable
스트림을 연결할 수 있습니다.
readable.pipe()
메서드는 대상 스트림에 대한 참조를 반환하여 파이프된 스트림 체인을 설정할 수 있도록 합니다.
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)
기본적으로 소스 Readable
스트림이 'end'
이벤트를 내보내면 대상 Writable
스트림에서 stream.end()
가 호출되어 더 이상 쓸 수 없게 됩니다. 이 기본 동작을 비활성화하려면 end
옵션을 false
로 전달하여 대상 스트림을 열어둘 수 있습니다.
reader.pipe(writer, { end: false })
reader.on('end', () => {
writer.end('Goodbye\n')
})
한 가지 중요한 주의 사항은 Readable
스트림이 처리 중에 오류를 내보내면 Writable
대상이 자동으로 닫히지 않는다는 것입니다. 오류가 발생하면 메모리 누수를 방지하기 위해 각 스트림을 수동으로 닫아야 합니다.
process.stderr
및 process.stdout
Writable
스트림은 지정된 옵션에 관계없이 Node.js 프로세스가 종료될 때까지 닫히지 않습니다.
readable.read([size])
추가된 버전: v0.9.4
readable.read()
메서드는 내부 버퍼에서 데이터를 읽어 반환합니다. 읽을 데이터가 없으면 null
이 반환됩니다. 기본적으로 데이터는 Buffer
객체로 반환되지만, readable.setEncoding()
메서드를 사용하여 인코딩이 지정되었거나 스트림이 객체 모드에서 작동하는 경우에는 예외입니다.
선택적인 size
인자는 읽을 바이트 수를 지정합니다. size
바이트를 읽을 수 없으면 스트림이 종료된 경우가 아니라면 null
이 반환되고, 스트림이 종료된 경우에는 내부 버퍼에 남아 있는 모든 데이터가 반환됩니다.
size
인자가 지정되지 않은 경우, 내부 버퍼에 포함된 모든 데이터가 반환됩니다.
size
인자는 1 GiB 이하이어야 합니다.
readable.read()
메서드는 일시 중지 모드에서 작동하는 Readable
스트림에서만 호출해야 합니다. 흐름 모드에서는 내부 버퍼가 완전히 비워질 때까지 readable.read()
가 자동으로 호출됩니다.
const readable = getReadableStreamSomehow()
// 데이터가 버퍼링됨에 따라 'readable'이 여러 번 트리거될 수 있습니다.
readable.on('readable', () => {
let chunk
console.log('스트림을 읽을 수 있습니다. (버퍼에 새 데이터가 수신됨)')
// 현재 사용 가능한 모든 데이터를 읽는지 확인하기 위해 루프를 사용합니다.
while (null !== (chunk = readable.read())) {
console.log(`데이터 ${chunk.length}바이트를 읽었습니다...`)
}
})
// 더 이상 사용 가능한 데이터가 없으면 'end'가 한 번 트리거됩니다.
readable.on('end', () => {
console.log('스트림 끝에 도달했습니다.')
})
readable.read()
를 각 호출하면 데이터 청크 또는 null
이 반환되며, 이는 해당 시점에서 읽을 데이터가 더 이상 없음을 나타냅니다. 이러한 청크는 자동으로 연결되지 않습니다. 단일 read()
호출이 모든 데이터를 반환하지 않으므로 모든 데이터가 검색될 때까지 청크를 계속 읽으려면 while 루프가 필요할 수 있습니다. 큰 파일을 읽을 때 .read()
는 일시적으로 null
을 반환할 수 있으며, 이는 버퍼링된 모든 콘텐츠를 소비했지만 아직 버퍼링할 데이터가 더 있을 수 있음을 나타냅니다. 이러한 경우 버퍼에 더 많은 데이터가 있으면 새 'readable'
이벤트가 발생하고 'end'
이벤트는 데이터 전송의 끝을 나타냅니다.
따라서 readable
에서 파일의 전체 내용을 읽으려면 여러 'readable'
이벤트에서 청크를 수집해야 합니다.
const chunks = []
readable.on('readable', () => {
let chunk
while (null !== (chunk = readable.read())) {
chunks.push(chunk)
}
})
readable.on('end', () => {
const content = chunks.join('')
})
객체 모드의 Readable
스트림은 size
인자의 값에 관계없이 readable.read(size)
호출에서 항상 단일 항목을 반환합니다.
readable.read()
메서드가 데이터 청크를 반환하면 'data'
이벤트도 발생합니다.
'end'
이벤트가 발생한 후 stream.read([size])
를 호출하면 null
이 반환됩니다. 런타임 오류는 발생하지 않습니다.
readable.readable
추가된 버전: v11.4.0
스트림이 소멸되지 않았거나 'error'
또는 'end'
를 내보내지 않았음을 의미하는 readable.read()
를 호출하는 것이 안전하면 true
입니다.
readable.readableAborted
추가된 버전: v16.8.0
스트림이 'end'
를 내보내기 전에 소멸되거나 오류가 발생했는지 여부를 반환합니다.
readable.readableDidRead
추가된 버전: v16.7.0, v14.18.0
'data'
가 내보내졌는지 여부를 반환합니다.
readable.readableEncoding
추가된 버전: v12.7.0
주어진 Readable
스트림의 encoding
속성에 대한 getter입니다. encoding
속성은 readable.setEncoding()
메서드를 사용하여 설정할 수 있습니다.
readable.readableEnded
추가된 버전: v12.9.0
'end'
이벤트가 내보내질 때 true
가 됩니다.
readable.errored
추가된 버전: v18.0.0
스트림이 오류와 함께 소멸된 경우 오류를 반환합니다.
readable.readableFlowing
추가된 버전: v9.4.0
이 속성은 세 가지 상태 섹션에 설명된 대로 Readable
스트림의 현재 상태를 반영합니다.
readable.readableHighWaterMark
Added in: v9.3.0
이 Readable
을 만들 때 전달된 highWaterMark
값을 반환합니다.
readable.readableLength
Added in: v9.4.0
이 속성은 읽을 준비가 된 큐에 있는 바이트(또는 객체) 수를 포함합니다. 이 값은 highWaterMark
상태에 대한 내부 조사 데이터를 제공합니다.
readable.readableObjectMode
Added in: v12.3.0
주어진 Readable
스트림의 objectMode
속성에 대한 getter입니다.
readable.resume()
[History]
Version | Changes |
---|---|
v10.0.0 | 'readable' 이벤트 리스너가 있는 경우 resume() 은 아무 효과가 없습니다. |
v0.9.4 | Added in: v0.9.4 |
- 반환 값: <this>
readable.resume()
메서드는 명시적으로 일시 중지된 Readable
스트림이 'data'
이벤트를 다시 내보내도록 하여 스트림을 흐름 모드로 전환합니다.
readable.resume()
메서드를 사용하여 실제로 데이터를 처리하지 않고 스트림에서 데이터를 완전히 사용할 수 있습니다.
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('끝에 도달했지만 아무것도 읽지 않았습니다.')
})
'readable'
이벤트 리스너가 있는 경우 readable.resume()
메서드는 아무 효과가 없습니다.
readable.setEncoding(encoding)
Added in: v0.9.4
readable.setEncoding()
메서드는 Readable
스트림에서 읽은 데이터에 대한 문자 인코딩을 설정합니다.
기본적으로 인코딩이 할당되지 않으며 스트림 데이터는 Buffer
객체로 반환됩니다. 인코딩을 설정하면 스트림 데이터가 Buffer
객체가 아닌 지정된 인코딩의 문자열로 반환됩니다. 예를 들어 readable.setEncoding('utf8')
을 호출하면 출력 데이터가 UTF-8 데이터로 해석되고 문자열로 전달됩니다. readable.setEncoding('hex')
를 호출하면 데이터가 16진수 문자열 형식으로 인코딩됩니다.
Readable
스트림은 스트림에서 가져온 Buffer
객체로 간단하게 가져올 경우 잘못 디코딩될 수 있는 스트림을 통해 전달되는 멀티바이트 문자를 적절하게 처리합니다.
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
assert.equal(typeof chunk, 'string')
console.log('문자열 데이터 %d자를 가져왔습니다:', chunk.length)
})
readable.unpipe([destination])
추가된 버전: v0.9.4
destination
<stream.Writable> 연결 해제할 특정 스트림 (선택 사항)- 반환값: <this>
readable.unpipe()
메서드는 stream.pipe()
메서드를 사용하여 이전에 연결된 Writable
스트림을 분리합니다.
destination
이 지정되지 않은 경우 모든 파이프가 분리됩니다.
destination
이 지정되었지만 해당 파이프가 설정되지 않은 경우 메서드는 아무 작업도 수행하지 않습니다.
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// readable의 모든 데이터가 'file.txt'로 이동하지만,
// 처음 1초 동안만 그렇습니다.
readable.pipe(writable)
setTimeout(() => {
console.log('file.txt 쓰기를 중단합니다.')
readable.unpipe(writable)
console.log('파일 스트림을 수동으로 닫습니다.')
writable.end()
}, 1000)
readable.unshift(chunk[, encoding])
[기록]
버전 | 변경 사항 |
---|---|
v22.0.0, v20.13.0 | 이제 chunk 인수가 TypedArray 또는 DataView 인스턴스가 될 수 있습니다. |
v8.0.0 | 이제 chunk 인수가 Uint8Array 인스턴스가 될 수 있습니다. |
v0.9.11 | 추가된 버전: v0.9.11 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 읽기 큐에 다시 추가할 데이터 청크입니다. 객체 모드에서 작동하지 않는 스트림의 경우chunk
는 <string>, <Buffer>, <TypedArray>, <DataView> 또는null
이어야 합니다. 객체 모드 스트림의 경우chunk
는 모든 JavaScript 값이 될 수 있습니다.encoding
<string> 문자열 청크의 인코딩입니다.'utf8'
또는'ascii'
와 같은 유효한Buffer
인코딩이어야 합니다.
chunk
를 null
로 전달하면 스트림의 끝(EOF)을 알리고 readable.push(null)
과 동일하게 작동하며, 이후에는 더 이상 데이터를 쓸 수 없습니다. EOF 신호는 버퍼의 끝에 배치되며 버퍼링된 데이터는 계속 플러시됩니다.
readable.unshift()
메서드는 데이터 청크를 내부 버퍼로 다시 밀어 넣습니다. 이는 스트림이 소스에서 낙관적으로 가져온 일부 데이터를 "소비 취소"해야 하는 코드에서 사용되는 특정 상황에서 유용합니다. 이렇게 하면 데이터를 다른 당사자에게 전달할 수 있습니다.
stream.unshift(chunk)
메서드는 'end'
이벤트가 발생한 후에는 호출할 수 없으며, 런타임 오류가 발생합니다.
stream.unshift()
를 사용하는 개발자는 종종 대신 Transform
스트림 사용으로 전환하는 것을 고려해야 합니다. 자세한 내용은 스트림 구현자를 위한 API 섹션을 참조하세요.
// \n\n으로 구분된 헤더를 가져옵니다.
// 너무 많이 가져오면 unshift()를 사용합니다.
// (오류, 헤더, 스트림)으로 콜백을 호출합니다.
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
stream.on('error', callback)
stream.on('readable', onReadable)
const decoder = new StringDecoder('utf8')
let header = ''
function onReadable() {
let chunk
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk)
if (str.includes('\n\n')) {
// 헤더 경계를 찾았습니다.
const split = str.split(/\n\n/)
header += split.shift()
const remaining = split.join('\n\n')
const buf = Buffer.from(remaining, 'utf8')
stream.removeListener('error', callback)
// unshifting 전에 'readable' 리스너를 제거합니다.
stream.removeListener('readable', onReadable)
if (buf.length) stream.unshift(buf)
// 이제 메시지의 본문을 스트림에서 읽을 수 있습니다.
callback(null, header, stream)
return
}
// 여전히 헤더를 읽고 있습니다.
header += str
}
}
}
stream.push(chunk)
와 달리 stream.unshift(chunk)
는 스트림의 내부 읽기 상태를 재설정하여 읽기 프로세스를 종료하지 않습니다. 이로 인해 사용자 지정 스트림에서 stream._read()
](/ko/api/stream#readable_readsize) 구현 내에서 읽기 중에 readable.unshift()
가 호출되면 예기치 않은 결과가 발생할 수 있습니다. readable.unshift()
호출 직후에 stream.push('')
를 호출하면 읽기 상태가 적절하게 재설정되지만 읽기를 수행하는 과정에서 readable.unshift()
를 호출하지 않는 것이 가장 좋습니다.
readable.wrap(stream)
추가된 버전: v0.9.4
Node.js 0.10 이전에는 스트림이 현재 정의된 node:stream
모듈 API 전체를 구현하지 않았습니다. (자세한 내용은 호환성을 참조하세요.)
'data'
이벤트를 발생시키고 권고 사항인 stream.pause()
메서드를 가진 이전 Node.js 라이브러리를 사용할 때, readable.wrap()
메서드를 사용하여 이전 스트림을 데이터 소스로 사용하는 Readable
스트림을 만들 수 있습니다.
readable.wrap()
을 사용해야 할 경우는 거의 없지만 이전 Node.js 애플리케이션 및 라이브러리와 상호 작용하는 편의를 위해 제공되었습니다.
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)
myReader.on('readable', () => {
myReader.read() // 등.
})
readable[Symbol.asyncIterator]()
[기록]
버전 | 변경 사항 |
---|---|
v11.14.0 | Symbol.asyncIterator 지원이 더 이상 실험적이지 않습니다. |
v10.0.0 | 추가된 버전: v10.0.0 |
- 반환 값: 스트림을 완전히 소비하는 <AsyncIterator>
const fs = require('node:fs')
async function print(readable) {
readable.setEncoding('utf8')
let data = ''
for await (const chunk of readable) {
data += chunk
}
console.log(data)
}
print(fs.createReadStream('file')).catch(console.error)
루프가 break
, return
또는 throw
로 종료되면 스트림이 파괴됩니다. 즉, 스트림을 반복하면 스트림이 완전히 소비됩니다. 스트림은 highWaterMark
옵션과 동일한 크기의 청크로 읽혀집니다. 위 코드 예제에서 fs.createReadStream()
에 highWaterMark
옵션이 제공되지 않았기 때문에 파일에 64KiB 미만의 데이터가 있는 경우 데이터는 단일 청크에 있습니다.
readable[Symbol.asyncDispose]()
추가된 버전: v20.4.0, v18.18.0
AbortError
와 함께 readable.destroy()
를 호출하고 스트림이 완료되면 이행되는 프로미스를 반환합니다.
readable.compose(stream[, options])
추가된 버전: v19.1.0, v18.13.0
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> 시그널이 중단되면 스트림을 파괴할 수 있습니다.
반환 값: <Duplex>
stream
스트림으로 구성된 스트림입니다.
import { Readable } from 'node:stream'
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ')
for (const word of words) {
yield word
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()
console.log(words) // prints ['this', 'is', 'compose', 'as', 'operator']
자세한 내용은 stream.compose
를 참조하십시오.
readable.iterator([options])
추가된 버전: v16.3.0
options
<Object>destroyOnReturn
<boolean>false
로 설정하면 비동기 반복자에서return
을 호출하거나break
,return
또는throw
를 사용하여for await...of
반복을 종료해도 스트림이 파괴되지 않습니다. 기본값:true
.
반환 값: 스트림을 소비하기 위한 <AsyncIterator>입니다.
이 메서드로 생성된 반복자는 for await...of
루프가 return
, break
또는 throw
로 종료된 경우나, 반복 중에 스트림에서 오류가 발생한 경우 스트림을 파괴해야 하는지 여부를 선택하여 스트림 파괴를 취소할 수 있는 옵션을 사용자에게 제공합니다.
const { Readable } = require('node:stream')
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // 2를 출력한 다음 3을 출력합니다.
}
console.log(readable.destroyed) // True, 스트림이 완전히 소비되었습니다.
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]))
await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}
showBoth()
readable.map(fn[, options])
[History]
버전 | 변경 사항 |
---|---|
v20.7.0, v18.19.0 | 옵션에 highWaterMark 추가. |
v17.4.0, v16.14.0 | 추가됨: v17.4.0, v16.14.0 |
fn
<Function> | <AsyncFunction> 스트림의 모든 청크를 매핑하는 함수.data
<any> 스트림에서 가져온 데이터 청크.options
<Object>signal
<AbortSignal> 스트림이 파괴되면 중단되어fn
호출을 조기에 중단할 수 있습니다.
options
<Object>concurrency
<number> 스트림에서 한 번에 호출할fn
의 최대 동시 호출 횟수. 기본값:1
.highWaterMark
<number> 매핑된 항목의 사용자 소비를 기다리는 동안 버퍼링할 항목 수. 기본값:concurrency * 2 - 1
.signal
<AbortSignal> 신호가 중단되면 스트림을 파괴할 수 있습니다.
반환 값: <Readable> 함수
fn
으로 매핑된 스트림.
이 메서드를 사용하면 스트림을 매핑할 수 있습니다. fn
함수는 스트림의 모든 청크에 대해 호출됩니다. fn
함수가 프로미스를 반환하는 경우, 해당 프로미스는 결과 스트림으로 전달되기 전에 await
됩니다.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// 동기 매퍼 사용.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
console.log(chunk) // 2, 4, 6, 8
}
// 비동기 매퍼 사용, 한 번에 최대 2개의 쿼리 실행.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
domain => resolver.resolve4(domain),
{ concurrency: 2 }
)
for await (const result of dnsResults) {
console.log(result) // resolver.resolve4의 DNS 결과 기록.
}
readable.filter(fn[, options])
[History]
버전 | 변경 사항 |
---|---|
v20.7.0, v18.19.0 | 옵션에 highWaterMark 추가됨. |
v17.4.0, v16.14.0 | 추가됨: v17.4.0, v16.14.0 |
fn
<Function> | <AsyncFunction> 스트림에서 청크를 필터링하는 함수.data
<any> 스트림에서 가져온 데이터 청크.options
<Object>signal
<AbortSignal> 스트림이 파괴되면 중단되어fn
호출을 조기에 중단할 수 있습니다.
options
<Object>concurrency
<number> 스트림에서 한 번에 호출할fn
의 최대 동시 호출 횟수. 기본값:1
.highWaterMark
<number> 필터링된 항목을 사용자가 소비하기를 기다리는 동안 버퍼링할 항목 수. 기본값:concurrency * 2 - 1
.signal
<AbortSignal> 신호가 중단되면 스트림을 파괴할 수 있습니다.
반환: <Readable> 조건자
fn
으로 필터링된 스트림.
이 메서드는 스트림을 필터링할 수 있도록 합니다. 스트림의 각 청크에 대해 fn
함수가 호출되고 truthy 값을 반환하면 청크가 결과 스트림으로 전달됩니다. fn
함수가 Promise를 반환하면 해당 Promise는 await
됩니다.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// 동기 조건자를 사용하는 경우.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// 비동기 조건자를 사용하여 한 번에 최대 2개의 쿼리를 만듭니다.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address.ttl > 60
},
{ concurrency: 2 }
)
for await (const result of dnsResults) {
// 해결된 DNS 레코드에 60초 이상인 도메인을 기록합니다.
console.log(result)
}
readable.forEach(fn[, options])
추가된 버전: v17.5.0, v16.15.0
fn
<Function> | <AsyncFunction> 스트림의 각 청크에 대해 호출할 함수입니다.data
<any> 스트림에서 가져온 데이터 청크입니다.options
<Object>signal
<AbortSignal> 스트림이 파괴되면 중단되어fn
호출을 조기에 중단할 수 있습니다.
options
<Object>concurrency
<number> 스트림에서 동시에 호출할fn
의 최대 동시 호출 횟수입니다. 기본값:1
.signal
<AbortSignal> 신호가 중단되면 스트림을 파괴할 수 있습니다.
반환 값: <Promise> 스트림이 완료되었을 때의 Promise입니다.
이 메서드를 사용하면 스트림을 반복할 수 있습니다. 스트림의 각 청크에 대해 fn
함수가 호출됩니다. fn
함수가 Promise를 반환하는 경우 해당 Promise는 await
됩니다.
이 메서드는 선택적으로 청크를 동시에 처리할 수 있다는 점에서 for await...of
루프와 다릅니다. 또한, forEach
반복은 signal
옵션을 전달하고 관련 AbortController
를 중단해야만 중단할 수 있는 반면, for await...of
는 break
또는 return
으로 중단할 수 있습니다. 어느 경우든 스트림은 파괴됩니다.
이 메서드는 기본 메커니즘에서 readable
이벤트를 사용하고 동시 fn
호출 수를 제한할 수 있다는 점에서 'data'
이벤트를 수신하는 것과 다릅니다.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// 동기적 술어를 사용합니다.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// 비동기적 술어를 사용하여 한 번에 최대 2개의 쿼리를 만듭니다.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
await dnsResults.forEach(result => {
// `for await (const result of dnsResults)`와 유사하게 결과를 로깅합니다.
console.log(result)
})
console.log('완료됨') // 스트림이 완료되었습니다.
readable.toArray([options])
추가된 버전: v17.5.0, v16.15.0
options
<Object>signal
<AbortSignal> 시그널이 중단되면 toArray 작업을 취소할 수 있습니다.
반환값: <Promise> 스트림의 내용을 담은 배열을 포함하는 프로미스입니다.
이 메서드를 사용하면 스트림의 내용을 쉽게 얻을 수 있습니다.
이 메서드는 전체 스트림을 메모리로 읽어들이기 때문에 스트림의 장점을 무효화합니다. 이 메서드는 스트림을 사용하는 기본 방법이 아닌 상호 운용성 및 편의성을 위한 것입니다.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]
// .map을 사용하여 DNS 쿼리를 동시에 수행하고
// toArray를 사용하여 결과를 배열로 수집
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
.map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
.toArray()
readable.some(fn[, options])
추가된 버전: v17.5.0, v16.15.0
fn
<Function> | <AsyncFunction> 스트림의 각 청크에 호출할 함수입니다.data
<any> 스트림의 데이터 청크입니다.options
<Object>signal
<AbortSignal> 스트림이 삭제되면 중단되어fn
호출을 조기에 중단할 수 있습니다.
options
<Object>concurrency
<number> 스트림에서 한 번에 호출할 수 있는fn
의 최대 동시 호출 횟수입니다. 기본값:1
.signal
<AbortSignal> 시그널이 중단되면 스트림을 삭제할 수 있습니다.
반환값: <Promise>
fn
이 하나 이상의 청크에 대해 truthy 값을 반환한 경우true
로 평가되는 프로미스입니다.
이 메서드는 Array.prototype.some
과 유사하며, 반환값이 true
(또는 truthy 값)가 될 때까지 스트림의 각 청크에 fn
을 호출합니다. 청크에 대한 fn
호출의 반환값이 truthy이면 스트림이 삭제되고 프로미스가 true
로 이행됩니다. 청크에 대한 fn
호출 중 어느 것도 truthy 값을 반환하지 않으면 프로미스가 false
로 이행됩니다.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// 동기적 술어를 사용합니다.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false
// 비동기적 술어를 사용하여 한 번에 최대 2개의 파일 검사를 수행합니다.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(anyBigFile) // 목록에 있는 파일 중 하나라도 1MB보다 크면 `true`입니다.
console.log('done') // 스트림이 완료되었습니다.
readable.find(fn[, options])
추가된 버전: v17.5.0, v16.17.0
fn
<Function> | <AsyncFunction> 스트림의 각 청크에서 호출할 함수입니다.data
<any> 스트림에서 가져온 데이터 청크입니다.options
<Object>signal
<AbortSignal> 스트림이 제거되면 중단되어fn
호출을 조기에 중단할 수 있습니다.
options
<Object>concurrency
<number> 스트림에서 한 번에 호출할fn
의 최대 동시 호출 수입니다. 기본값:1
.signal
<AbortSignal> 신호가 중단되면 스트림을 제거할 수 있습니다.
반환 값: <Promise>
fn
이 참 값으로 평가된 첫 번째 청크 또는 요소가 없으면undefined
로 평가되는 프로미스입니다.
이 메서드는 Array.prototype.find
와 유사하며 스트림의 각 청크에 대해 fn
을 호출하여 fn
에 대한 참 값을 가진 청크를 찾습니다. fn
호출의 대기 반환 값이 참이면 스트림이 제거되고 fn
이 참 값을 반환한 값으로 프로미스가 충족됩니다. 청크에 대한 모든 fn
호출이 거짓 값을 반환하면 프로미스는 undefined
로 충족됩니다.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// 동기 조건자를 사용합니다.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined
// 비동기 조건자를 사용하여 한 번에 최대 2개의 파일 검사를 수행합니다.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(foundBigFile) // 목록에 1MB보다 큰 파일이 있는 경우 큰 파일의 파일 이름
console.log('done') // 스트림이 완료되었습니다.
readable.every(fn[, options])
추가된 버전: v17.5.0, v16.15.0
fn
<함수> | <AsyncFunction> 스트림의 각 청크에서 호출할 함수입니다.data
<any> 스트림의 데이터 청크입니다.options
<Object>signal
<AbortSignal> 스트림이 파괴되면 중단되어fn
호출을 조기에 중단할 수 있습니다.
options
<Object>concurrency
<number> 스트림에서 한 번에 호출할fn
의 최대 동시 호출 횟수입니다. 기본값:1
.signal
<AbortSignal> 시그널이 중단되면 스트림을 파괴할 수 있습니다.
반환값: <Promise>
fn
이 모든 청크에 대해 truthy 값을 반환한 경우true
로 평가되는 프로미스입니다.
이 메서드는 Array.prototype.every
와 유사하며 스트림의 각 청크에서 fn
을 호출하여 모든 기다린 반환 값이 fn
에 대한 truthy 값인지 확인합니다. 청크에서 fn
호출에 대해 기다린 반환 값이 falsy이면 스트림이 파괴되고 프로미스는 false
로 이행됩니다. 청크에 대한 모든 fn
호출이 truthy 값을 반환하면 프로미스가 true
로 이행됩니다.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// 동기 술어를 사용합니다.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true
// 비동기 술어를 사용하여 한 번에 최대 2개의 파일 검사를 수행합니다.
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
// 목록의 모든 파일이 1MiB보다 크면 `true`입니다.
console.log(allBigFiles)
console.log('done') // 스트림이 완료되었습니다.
readable.flatMap(fn[, options])
Added in: v17.5.0, v16.15.0
fn
<Function> | <AsyncGeneratorFunction> | <AsyncFunction> 스트림의 모든 청크에 매핑할 함수.data
<any> 스트림에서 온 데이터 청크.options
<Object>signal
<AbortSignal> 스트림이 파괴되면 중단되어fn
호출을 조기에 중단할 수 있습니다.
options
<Object>concurrency
<number> 스트림에서 한 번에 호출할fn
의 최대 동시 호출 횟수입니다. 기본값:1
.signal
<AbortSignal> 신호가 중단되면 스트림을 파괴할 수 있습니다.
반환값: <Readable> 함수
fn
으로 플랫 매핑된 스트림입니다.
이 메서드는 스트림의 각 청크에 주어진 콜백을 적용한 다음 결과를 평탄화하여 새 스트림을 반환합니다.
fn
에서 스트림 또는 다른 반복 가능 또는 비동기 반복 가능 객체를 반환할 수 있으며 결과 스트림은 반환된 스트림으로 병합(평탄화)됩니다.
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'
// 동기 매퍼 사용.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// 비동기 매퍼를 사용하여 4개의 파일 내용 결합
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
createReadStream(fileName)
)
for await (const result of concatResult) {
// 여기에는 4개의 파일 모두의 내용(모든 청크)이 포함됩니다.
console.log(result)
}
readable.drop(limit[, options])
Added in: v17.5.0, v16.15.0
[Stable: 1 - Experimental]
Stable: 1 Stability: 1 - 실험적
limit
<number> 읽을 수 있는 항목에서 삭제할 청크 수입니다.options
<Object>signal
<AbortSignal> 신호가 중단되면 스트림을 제거할 수 있습니다.
반환 값: <Readable>
limit
청크가 삭제된 스트림입니다.
이 메서드는 처음 limit
청크가 삭제된 새 스트림을 반환합니다.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])
Added in: v17.5.0, v16.15.0
[Stable: 1 - Experimental]
Stable: 1 Stability: 1 - 실험적
limit
<number> 읽을 수 있는 항목에서 가져올 청크 수입니다.options
<Object>signal
<AbortSignal> 신호가 중단되면 스트림을 제거할 수 있습니다.
반환 값: <Readable>
limit
청크가 가져온 스트림입니다.
이 메서드는 처음 limit
청크를 포함하는 새 스트림을 반환합니다.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])
Added in: v17.5.0, v16.15.0
[Stable: 1 - Experimental]
Stable: 1 Stability: 1 - 실험적
fn
<Function> | <AsyncFunction> 스트림의 모든 청크에 대해 호출할 리듀서 함수입니다.previous
<any>fn
에 대한 마지막 호출에서 얻은 값, 지정된 경우initial
값, 그렇지 않으면 스트림의 첫 번째 청크입니다.data
<any> 스트림의 데이터 청크입니다.options
<Object>signal
<AbortSignal> 스트림이 제거되면 중단되어fn
호출을 조기에 중단할 수 있습니다.
initial
<any> 축소에 사용할 초기 값입니다.options
<Object>signal
<AbortSignal> 신호가 중단되면 스트림을 제거할 수 있습니다.
반환 값: <Promise> 축소의 최종 값에 대한 프로미스입니다.
이 메서드는 스트림의 각 청크에서 fn
을 순서대로 호출하고 이전 요소에 대한 계산 결과를 전달합니다. 축소의 최종 값에 대한 프로미스를 반환합니다.
initial
값이 제공되지 않으면 스트림의 첫 번째 청크가 초기 값으로 사용됩니다. 스트림이 비어 있으면 프로미스는 ERR_INVALID_ARGS
코드 속성과 함께 TypeError
로 거부됩니다.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file))
return totalSize + size
}, 0)
console.log(folderSize)
리듀서 함수는 스트림 요소를 요소별로 반복합니다. 즉, concurrency
매개변수 또는 병렬 처리가 없습니다. reduce
를 동시에 수행하려면 readable.map
메서드로 비동기 함수를 추출할 수 있습니다.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir)
.map(file => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0)
console.log(folderSize)
이중 및 변환 스트림
클래스: stream.Duplex
[기록]
버전 | 변경 사항 |
---|---|
v6.8.0 | 이제 Duplex 인스턴스가 instanceof stream.Writable 을 확인할 때 true 를 반환합니다. |
v0.9.4 | 추가됨: v0.9.4 |
이중 스트림은 Readable
및 Writable
인터페이스를 모두 구현하는 스트림입니다.
Duplex
스트림의 예는 다음과 같습니다.
duplex.allowHalfOpen
추가됨: v0.9.4
false
인 경우 스트림은 읽기 가능 측면이 끝나면 쓰기 가능 측면을 자동으로 종료합니다. 초기에는 기본값이 true
인 allowHalfOpen
생성자 옵션에 의해 설정됩니다.
이는 기존 Duplex
스트림 인스턴스의 반개방 동작을 변경하기 위해 수동으로 변경할 수 있지만, 'end'
이벤트가 발생하기 전에 변경해야 합니다.
클래스: stream.Transform
추가됨: v0.9.4
변환 스트림은 출력이 어떤 식으로든 입력과 관련된 Duplex
스트림입니다. 모든 Duplex
스트림과 마찬가지로 Transform
스트림은 Readable
및 Writable
인터페이스를 모두 구현합니다.
Transform
스트림의 예는 다음과 같습니다.
transform.destroy([error])
[기록]
버전 | 변경 사항 |
---|---|
v14.0.0 | 이미 파괴된 스트림에서 아무 작업도 하지 않습니다. |
v8.0.0 | 추가됨: v8.0.0 |
스트림을 파괴하고 선택적으로 'error'
이벤트를 발생시킵니다. 이 호출 후 변환 스트림은 모든 내부 리소스를 해제합니다. 구현자는 이 메서드를 재정의해서는 안 되며 대신 readable._destroy()
를 구현해야 합니다. Transform
에 대한 _destroy()
의 기본 구현은 emitClose
가 false로 설정되지 않은 한 'close'
도 발생시킵니다.
destroy()
가 호출되면 추가 호출은 아무 작업도 하지 않으며 _destroy()
에서 발생하는 오류를 제외하고는 더 이상 'error'
로 오류가 발생하지 않을 수 있습니다.
stream.duplexPair([options])
추가된 버전: v22.6.0, v20.17.0
유틸리티 함수인 duplexPair
는 두 개의 항목이 있는 배열을 반환하며, 각 항목은 다른 쪽과 연결된 Duplex
스트림입니다.
const [sideA, sideB] = duplexPair()
한 스트림에 작성된 내용은 다른 스트림에서 읽을 수 있게 됩니다. 이는 클라이언트가 작성한 데이터가 서버에서 읽을 수 있게 되는 네트워크 연결과 유사한 동작을 제공합니다.
Duplex 스트림은 대칭적입니다. 어느 쪽을 사용하든 동작에 차이가 없습니다.
stream.finished(stream[, options], callback)
[기록]
버전 | 변경 사항 |
---|---|
v19.5.0 | ReadableStream 및 WritableStream 에 대한 지원이 추가되었습니다. |
v15.11.0 | signal 옵션이 추가되었습니다. |
v14.0.0 | finished(stream, cb) 는 콜백을 호출하기 전에 'close' 이벤트를 기다립니다. 구현은 레거시 스트림을 감지하고 'close' 를 발생시킬 것으로 예상되는 스트림에만 이 동작을 적용하려고 시도합니다. |
v14.0.0 | Readable 스트림에서 'end' 전에 'close' 를 발생시키면 ERR_STREAM_PREMATURE_CLOSE 오류가 발생합니다. |
v14.0.0 | finished(stream, cb) 호출 전에 이미 완료된 스트림에서 콜백이 호출됩니다. |
v10.0.0 | 추가된 버전: v10.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> 읽기 가능 및/또는 쓰기 가능한 스트림/웹 스트림입니다.options
<Object>error
<boolean>false
로 설정하면emit('error', err)
호출이 완료된 것으로 처리되지 않습니다. 기본값:true
.readable
<boolean>false
로 설정하면 스트림이 여전히 읽을 수 있더라도 스트림이 종료될 때 콜백이 호출됩니다. 기본값:true
.writable
<boolean>false
로 설정하면 스트림이 여전히 쓸 수 있더라도 스트림이 종료될 때 콜백이 호출됩니다. 기본값:true
.signal
<AbortSignal> 스트림 완료 대기를 중단할 수 있습니다. 신호가 중단되면 기본 스트림은 중단되지 않습니다. 콜백은AbortError
와 함께 호출됩니다. 이 함수에서 추가된 모든 등록된 리스너도 제거됩니다.
callback
<Function> 선택적 오류 인수를 취하는 콜백 함수입니다.반환 값: 모든 등록된 리스너를 제거하는 정리 함수 <Function> 입니다.
스트림이 더 이상 읽을 수 없거나, 쓸 수 없거나, 오류가 발생하거나, 조기 종료된 경우 알림을 받는 함수입니다.
const { finished } = require('node:stream')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
finished(rs, err => {
if (err) {
console.error('스트림 실패.', err)
} else {
console.log('스트림 읽기가 완료되었습니다.')
}
})
rs.resume() // 스트림을 비웁니다.
특히 스트림이 조기에 파괴되는 (중단된 HTTP 요청과 같은) 오류 처리 시나리오에서 유용하며, 'end'
또는 'finish'
를 발생시키지 않습니다.
finished
API는 Promise 버전을 제공합니다.
stream.finished()
는 callback
이 호출된 후에도 (특히 'error'
, 'end'
, 'finish'
및 'close'
) 매달린 이벤트 리스너를 남겨둡니다. 이는 잘못된 스트림 구현으로 인한 예상치 못한 'error'
이벤트가 예상치 못한 충돌을 일으키지 않도록 하기 위함입니다. 이것이 원치 않는 동작이면 콜백에서 반환된 정리 함수를 호출해야 합니다.
const cleanup = finished(rs, err => {
cleanup()
// ...
})
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
[기록]
버전 | 변경 사항 |
---|---|
v19.7.0, v18.16.0 | 웹 스트림 지원 추가됨. |
v18.0.0 | callback 인수에 유효하지 않은 콜백을 전달하면 이제 ERR_INVALID_CALLBACK 대신 ERR_INVALID_ARG_TYPE 을 throw합니다. |
v14.0.0 | pipeline(..., cb) 는 콜백을 호출하기 전에 'close' 이벤트가 발생하기를 기다립니다. 이 구현은 기존 스트림을 감지하고 'close' 를 내보낼 것으로 예상되는 스트림에만 이 동작을 적용하려고 합니다. |
v13.10.0 | 비동기 생성기 지원 추가됨. |
v10.0.0 | v10.0.0에서 추가됨 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- 반환 값: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- 반환 값: <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- 반환 값: <AsyncIterable> | <Promise>
callback
<Function> 파이프라인이 완전히 완료되면 호출됩니다.err
<Error>val
destination
에서 반환된Promise
의 확인된 값입니다.
반환 값: <Stream>
스트림과 생성기 사이에서 파이프를 연결하고 오류를 전달하고, 파이프라인이 완료되면 제대로 정리하고 콜백을 제공하는 모듈 메서드입니다.
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// 파이프라인 API를 사용하여 일련의 스트림을 쉽게 파이프로 연결하고
// 파이프라인이 완전히 완료되면 알림을 받습니다.
// 잠재적으로 매우 큰 tar 파일을 효율적으로 gzip으로 압축하는 파이프라인:
pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
if (err) {
console.error('파이프라인 실패.', err)
} else {
console.log('파이프라인 성공.')
}
})
pipeline
API는 프로미스 버전을 제공합니다.
stream.pipeline()
는 다음을 제외한 모든 스트림에서 stream.destroy(err)
를 호출합니다.
'end'
또는'close'
를 내보낸Readable
스트림입니다.'finish'
또는'close'
를 내보낸Writable
스트림입니다.
stream.pipeline()
는 callback
이 호출된 후 스트림에 매달린 이벤트 리스너를 남깁니다. 실패 후 스트림을 재사용하는 경우, 이로 인해 이벤트 리스너 누수 및 삼켜진 오류가 발생할 수 있습니다. 마지막 스트림이 읽을 수 있는 경우, 매달린 이벤트 리스너는 제거되어 마지막 스트림을 나중에 사용할 수 있습니다.
stream.pipeline()
는 오류가 발생하면 모든 스트림을 닫습니다. pipeline
이 소켓을 파괴하여 예상된 응답을 보내지 못하게 되는 즉시 IncomingRequest
를 pipeline
과 함께 사용하면 예상치 못한 동작이 발생할 수 있습니다. 아래 예시를 참조하십시오.
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt')
pipeline(fileStream, res, err => {
if (err) {
console.log(err) // 해당 파일 없음
// `pipeline`이 이미 소켓을 파괴했으므로 이 메시지를 보낼 수 없습니다.
return res.end('오류!!!')
}
})
})
stream.compose(...streams)
[연혁]
버전 | 변경 사항 |
---|---|
v21.1.0, v20.10.0 | 스트림 클래스 지원이 추가되었습니다. |
v19.8.0, v18.16.0 | 웹 스트림 지원이 추가되었습니다. |
v16.9.0 | 다음에서 추가됨: v16.9.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- 반환: <stream.Duplex>
두 개 이상의 스트림을 첫 번째 스트림에 쓰고 마지막 스트림에서 읽는 Duplex
스트림으로 결합합니다. 제공된 각 스트림은 stream.pipeline
을 사용하여 다음 스트림으로 파이프됩니다. 스트림 중 하나에서 오류가 발생하면 외부 Duplex
스트림을 포함하여 모두 파괴됩니다.
stream.compose
는 차례로 다른 스트림으로 파이프될 수 있는 (그리고 파이프해야 하는) 새 스트림을 반환하므로 구성이 가능합니다. 대조적으로 스트림을 stream.pipeline
에 전달할 때 일반적으로 첫 번째 스트림은 읽기 가능한 스트림이고 마지막 스트림은 쓰기 가능한 스트림이므로 폐쇄 회로를 형성합니다.
Function
이 전달되면 source
Iterable
을 사용하는 팩토리 메서드여야 합니다.
import { compose, Transform } from 'node:stream'
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''))
},
})
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
}
let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf
}
console.log(res) // 'HELLOWORLD' 출력
stream.compose
는 비동기 이터러블, 제너레이터 및 함수를 스트림으로 변환하는 데 사용할 수 있습니다.
AsyncIterable
은 읽기 가능한Duplex
로 변환됩니다.null
을 생성할 수 없습니다.AsyncGeneratorFunction
은 읽기/쓰기 변환Duplex
로 변환됩니다. 첫 번째 매개변수로 소스AsyncIterable
을 사용해야 합니다.null
을 생성할 수 없습니다.AsyncFunction
은 쓰기 가능한Duplex
로 변환됩니다.null
또는undefined
를 반환해야 합니다.
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'
// 비동기 이터러블을 읽기 가능한 Duplex로 변환합니다.
const s1 = compose(
(async function* () {
yield 'Hello'
yield 'World'
})()
)
// 비동기 제너레이터를 변환 Duplex로 변환합니다.
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
})
let res = ''
// 비동기 함수를 쓰기 가능한 Duplex로 변환합니다.
const s3 = compose(async function (source) {
for await (const chunk of source) {
res += chunk
}
})
await finished(compose(s1, s2, s3))
console.log(res) // 'HELLOWORLD' 출력
stream.compose
를 연산자로 사용하려면 readable.compose(stream)
를 참조하십시오.
stream.Readable.from(iterable[, options])
추가된 버전: v12.3.0, v10.17.0
iterable
<Iterable>Symbol.asyncIterator
또는Symbol.iterator
iterable 프로토콜을 구현하는 객체입니다. null 값이 전달되면 'error' 이벤트를 발생시킵니다.options
<Object>new stream.Readable([options])
에 제공되는 옵션입니다. 기본적으로Readable.from()
은options.objectMode
를false
로 명시적으로 선택하지 않는 한options.objectMode
를true
로 설정합니다.- 반환값: <stream.Readable>
반복자에서 읽을 수 있는 스트림을 만드는 유틸리티 메서드입니다.
const { Readable } = require('node:stream')
async function* generate() {
yield 'hello'
yield 'streams'
}
const readable = Readable.from(generate())
readable.on('data', chunk => {
console.log(chunk)
})
Readable.from(string)
또는 Readable.from(buffer)
를 호출하면 성능상의 이유로 문자열 또는 버퍼가 반복되지 않아 다른 스트림의 의미와 일치하지 않습니다.
프로미스를 포함하는 Iterable
객체가 인수로 전달되면 처리되지 않은 거부가 발생할 수 있습니다.
const { Readable } = require('node:stream')
Readable.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // 처리되지 않은 거부
])
stream.Readable.fromWeb(readableStream[, options])
추가된 버전: v17.0.0
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
반환값: <stream.Readable>
stream.Readable.isDisturbed(stream)
추가된 버전: v16.8.0
stream
<stream.Readable> | <ReadableStream>- 반환값:
boolean
스트림에서 읽었거나 취소되었는지 여부를 반환합니다.
stream.isErrored(stream)
추가된 버전: v17.3.0, v16.14.0
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- 반환값: <boolean>
스트림에 오류가 발생했는지 여부를 반환합니다.
stream.isReadable(stream)
추가된 버전: v17.4.0, v16.14.0
stream
<Readable> | <Duplex> | <ReadableStream>- 반환값: <boolean>
스트림을 읽을 수 있는지 여부를 반환합니다.
stream.Readable.toWeb(streamReadable[, options])
추가된 버전: v17.0.0
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> 주어진stream.Readable
에서 읽을 때 백프레셔가 적용되기 전의 생성된ReadableStream
의 최대 내부 큐 크기입니다. 값을 제공하지 않으면 주어진stream.Readable
에서 가져옵니다.size
<Function> 주어진 데이터 청크의 크기를 지정하는 함수입니다. 값을 제공하지 않으면 모든 청크에 대해 크기가1
이 됩니다.chunk
<any>- 반환값: <number>
반환값: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
추가된 버전: v17.0.0
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
반환값: <stream.Writable>
stream.Writable.toWeb(streamWritable)
추가된 버전: v17.0.0
streamWritable
<stream.Writable>- 반환값: <WritableStream>
stream.Duplex.from(src)
[기록]
버전 | 변경 사항 |
---|---|
v19.5.0, v18.17.0 | 이제 src 인수는 ReadableStream 또는 WritableStream 이 될 수 있습니다. |
v16.8.0 | 추가된 버전: v16.8.0 |
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
이중 스트림을 생성하는 유틸리티 메서드입니다.
Stream
은 쓰기 가능 스트림을 쓰기 가능한Duplex
로 변환하고 읽기 가능 스트림을Duplex
로 변환합니다.Blob
은 읽기 가능한Duplex
로 변환합니다.string
은 읽기 가능한Duplex
로 변환합니다.ArrayBuffer
는 읽기 가능한Duplex
로 변환합니다.AsyncIterable
은 읽기 가능한Duplex
로 변환합니다.null
을 생성할 수 없습니다.AsyncGeneratorFunction
은 읽기/쓰기 변환Duplex
로 변환합니다. 첫 번째 매개변수로 소스AsyncIterable
을 가져와야 합니다.null
을 생성할 수 없습니다.AsyncFunction
은 쓰기 가능한Duplex
로 변환합니다.null
또는undefined
를 반환해야 합니다.Object ({ writable, readable })
은readable
과writable
을Stream
으로 변환한 다음Duplex
로 결합합니다. 여기서Duplex
는writable
에 쓰고readable
에서 읽습니다.Promise
는 읽기 가능한Duplex
로 변환합니다.null
값은 무시됩니다.ReadableStream
은 읽기 가능한Duplex
로 변환합니다.WritableStream
은 쓰기 가능한Duplex
로 변환합니다.- 반환값: <stream.Duplex>
프로미스를 포함하는 Iterable
객체가 인수로 전달되면 처리되지 않은 거부가 발생할 수 있습니다.
const { Duplex } = require('node:stream')
Duplex.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // 처리되지 않은 거부
])
stream.Duplex.fromWeb(pair[, options])
Added in: v17.0.0
[Stable: 1 - Experimental]
Stable: 1 Stability: 1 - 실험적
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>Returns: <stream.Duplex>
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
for await (const chunk of duplex) {
console.log('readable', chunk)
}
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))
stream.Duplex.toWeb(streamDuplex)
Added in: v17.0.0
[Stable: 1 - Experimental]
Stable: 1 Stability: 1 - 실험적
streamDuplex
<stream.Duplex>- 반환값: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream'
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
const { value } = await readable.getReader().read()
console.log('readable', value)
const { Duplex } = require('node:stream')
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
readable
.getReader()
.read()
.then(result => {
console.log('readable', result.value)
})
stream.addAbortSignal(signal, stream)
[History]
Version | Changes |
---|---|
v19.7.0, v18.16.0 | ReadableStream 및 WritableStream 에 대한 지원 추가. |
v15.4.0 | 추가됨: v15.4.0 |
signal
<AbortSignal> 가능한 취소를 나타내는 신호stream
<Stream> | <ReadableStream> | <WritableStream> 신호를 연결할 스트림.
AbortSignal을 읽기 가능 또는 쓰기 가능한 스트림에 연결합니다. 이를 통해 코드는 AbortController
를 사용하여 스트림 파괴를 제어할 수 있습니다.
전달된 AbortSignal
에 해당하는 AbortController
에서 abort
를 호출하면 스트림에서 .destroy(new AbortError())
를 호출하는 것과 동일하게 작동하고 웹스트림의 경우 controller.error(new AbortError())
와 동일하게 작동합니다.
const fs = require('node:fs')
const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// 나중에 스트림을 닫는 작업을 중단합니다.
controller.abort()
또는 비동기 반복자로서 읽기 가능한 스트림과 함께 AbortSignal
을 사용합니다.
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // 시간 초과 설정
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
try {
for await (const chunk of stream) {
await process(chunk)
}
} catch (e) {
if (e.name === 'AbortError') {
// 작업이 취소되었습니다.
} else {
throw e
}
}
})()
또는 ReadableStream과 함께 AbortSignal
을 사용합니다.
const controller = new AbortController()
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello')
controller.enqueue('world')
controller.close()
},
})
addAbortSignal(controller.signal, rs)
finished(rs, err => {
if (err) {
if (err.name === 'AbortError') {
// 작업이 취소되었습니다.
}
}
})
const reader = rs.getReader()
reader.read().then(({ value, done }) => {
console.log(value) // hello
console.log(done) // false
controller.abort()
})
stream.getDefaultHighWaterMark(objectMode)
추가된 버전: v19.9.0, v18.17.0
스트림에서 사용되는 기본 highWaterMark를 반환합니다. 기본값은 65536
(64KiB)이며, objectMode
의 경우 16
입니다.
stream.setDefaultHighWaterMark(objectMode, value)
추가된 버전: v19.9.0, v18.17.0
스트림에서 사용되는 기본 highWaterMark를 설정합니다.
스트림 구현자를 위한 API
node:stream
모듈 API는 JavaScript의 프로토타입 상속 모델을 사용하여 스트림을 쉽게 구현할 수 있도록 설계되었습니다.
먼저 스트림 개발자는 네 가지 기본 스트림 클래스(stream.Writable
, stream.Readable
, stream.Duplex
또는 stream.Transform
) 중 하나를 확장하는 새 JavaScript 클래스를 선언하고 적절한 부모 클래스 생성자를 호출해야 합니다.
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark })
// ...
}
}
스트림을 확장할 때 사용자가 어떤 옵션을 제공할 수 있고 제공해야 하는지, 그리고 이를 기본 생성자로 전달하기 전에 어떤 옵션이 필요한지 기억하십시오. 예를 들어 구현에서 autoDestroy
및 emitClose
옵션에 대한 가정이 있는 경우 사용자가 이를 재정의하도록 허용하지 마십시오. 모든 옵션을 암묵적으로 전달하는 대신 어떤 옵션이 전달되는지 명시적으로 지정하십시오.
새 스트림 클래스는 생성되는 스트림 유형에 따라 아래 표에 자세히 설명된 대로 하나 이상의 특정 메서드를 구현해야 합니다.
사용 사례 | 클래스 | 구현할 메서드 |
---|---|---|
읽기 전용 | Readable | _read() |
쓰기 전용 | Writable | _write() , _writev() , _final() |
읽기 및 쓰기 | Duplex | _read() , _write() , _writev() , _final() |
기록된 데이터를 조작한 다음 결과를 읽기 | Transform | _transform() , _flush() , _final() |
스트림에 대한 구현 코드는 소비자가 사용하도록 의도된 스트림의 "공용" 메서드를 절대로 호출해서는 안 됩니다(자세한 내용은 스트림 소비자용 API 섹션에 설명되어 있음). 이렇게 하면 스트림을 소비하는 애플리케이션 코드에서 부작용이 발생할 수 있습니다.
.emit()
을 통해 'error'
, 'data'
, 'end'
, 'finish'
및 'close'
와 같은 내부 이벤트를 발생시키거나 write()
, end()
, cork()
, uncork()
, read()
및 destroy()
와 같은 공용 메서드를 재정의하지 마십시오. 이렇게 하면 현재 및 미래의 스트림 불변성이 깨져 다른 스트림, 스트림 유틸리티 및 사용자 기대치와의 동작 및/또는 호환성 문제가 발생할 수 있습니다.
단순화된 생성
추가된 버전: v1.2.0
많은 간단한 경우, 상속에 의존하지 않고 스트림을 생성하는 것이 가능합니다. 이는 stream.Writable
, stream.Readable
, stream.Duplex
또는 stream.Transform
객체의 인스턴스를 직접 생성하고 적절한 메서드를 생성자 옵션으로 전달함으로써 달성할 수 있습니다.
const { Writable } = require('node:stream')
const myWritable = new Writable({
construct(callback) {
// 상태를 초기화하고 리소스를 로드합니다...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// 리소스를 해제합니다...
},
})
쓰기 가능한 스트림 구현
Writable
스트림을 구현하기 위해 stream.Writable
클래스가 확장됩니다.
사용자 정의 Writable
스트림은 반드시 new stream.Writable([options])
생성자를 호출하고 writable._write()
및/또는 writable._writev()
메서드를 구현해야 합니다.
new stream.Writable([options])
[히스토리]
버전 | 변경 사항 |
---|---|
v22.0.0 | 기본 highWaterMark를 높입니다. |
v15.5.0 | AbortSignal 전달을 지원합니다. |
v14.0.0 | autoDestroy 옵션의 기본값을 true 로 변경합니다. |
v11.2.0, v10.16.0 | 'finish' 를 방출하거나 오류가 발생할 때 스트림을 자동으로 destroy() 하는 autoDestroy 옵션을 추가합니다. |
v10.0.0 | destroy시 'close' 를 방출할지 여부를 지정하는 emitClose 옵션을 추가합니다. |
options
<Object>highWaterMark
<number>stream.write()
가false
를 반환하기 시작하는 버퍼 레벨입니다. 기본값:65536
(64 KiB), 또는objectMode
스트림의 경우16
.decodeStrings
<boolean>stream.write()
에 전달된string
을stream._write()
로 전달하기 전에Buffer
로 인코딩할지 여부(stream.write()
호출에 지정된 인코딩 사용). 다른 유형의 데이터는 변환되지 않습니다(예:Buffer
는string
으로 디코딩되지 않음). false로 설정하면string
이 변환되지 않습니다. 기본값:true
.defaultEncoding
<string>stream.write()
에 인코딩이 인수로 지정되지 않은 경우 사용되는 기본 인코딩입니다. 기본값:'utf8'
.objectMode
<boolean>stream.write(anyObj)
가 유효한 작업인지 여부입니다. 설정되면 스트림 구현에서 지원하는 경우 string, <Buffer>, <TypedArray> 또는 <DataView> 이외의 JavaScript 값을 쓰는 것이 가능해집니다. 기본값:false
.emitClose
<boolean> 스트림이 삭제된 후'close'
를 방출해야 하는지 여부입니다. 기본값:true
.write
<Function>stream._write()
메서드에 대한 구현입니다.writev
<Function>stream._writev()
메서드에 대한 구현입니다.destroy
<Function>stream._destroy()
메서드에 대한 구현입니다.final
<Function>stream._final()
메서드에 대한 구현입니다.construct
<Function>stream._construct()
메서드에 대한 구현입니다.autoDestroy
<boolean> 이 스트림이 종료된 후 자동으로.destroy()
를 호출해야 하는지 여부입니다. 기본값:true
.signal
<AbortSignal> 가능한 취소를 나타내는 신호입니다.
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor(options) {
// stream.Writable() 생성자를 호출합니다.
super(options)
// ...
}
}
또는 ES6 이전 스타일 생성자를 사용하는 경우:
const { Writable } = require('node:stream')
const util = require('node:util')
function MyWritable(options) {
if (!(this instanceof MyWritable)) return new MyWritable(options)
Writable.call(this, options)
}
util.inherits(MyWritable, Writable)
또는 단순화된 생성자 접근 방식을 사용하는 경우:
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
})
전달된 AbortSignal
에 해당하는 AbortController
에서 abort
를 호출하는 것은 쓰기 가능한 스트림에서 .destroy(new AbortError())
를 호출하는 것과 동일하게 동작합니다.
const { Writable } = require('node:stream')
const controller = new AbortController()
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
})
// 나중에 스트림을 닫는 작업을 중단합니다
controller.abort()
writable._construct(callback)
추가된 버전: v15.0.0
callback
<Function> 스트림 초기화가 완료되었을 때 이 함수를 호출합니다 (오류 인수를 선택적으로 사용할 수 있습니다).
_construct()
메서드는 직접 호출해서는 안 됩니다. 하위 클래스에서 구현할 수 있으며, 구현된 경우 내부 Writable
클래스 메서드에서만 호출합니다.
이 선택적 함수는 스트림 생성자가 반환된 후 한 틱 뒤에 호출되어 callback
이 호출될 때까지 모든 _write()
, _final()
및 _destroy()
호출을 지연시킵니다. 이는 스트림을 사용하기 전에 상태를 초기화하거나 비동기적으로 리소스를 초기화하는 데 유용합니다.
const { Writable } = require('node:stream')
const fs = require('node:fs')
class WriteStream extends Writable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback)
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
writable._write(chunk, encoding, callback)
[기록]
버전 | 변경 사항 |
---|---|
v12.11.0 | _writev() 를 제공하는 경우 _write() 는 선택 사항입니다. |
chunk
<Buffer> | <string> | <any> 쓸Buffer
로,stream.write()
에 전달된string
에서 변환됩니다. 스트림의decodeStrings
옵션이false
이거나 스트림이 객체 모드로 작동하는 경우 청크는 변환되지 않고stream.write()
에 전달된 값이 됩니다.encoding
<string> 청크가 문자열인 경우encoding
은 해당 문자열의 문자 인코딩입니다. 청크가Buffer
이거나 스트림이 객체 모드로 작동하는 경우encoding
은 무시될 수 있습니다.callback
<Function> 제공된 청크에 대한 처리가 완료되면 (선택적으로 오류 인수와 함께) 이 함수를 호출합니다.
모든 Writable
스트림 구현은 기본 리소스로 데이터를 보내기 위해 writable._write()
및/또는 writable._writev()
메서드를 제공해야 합니다.
Transform
스트림은 자체 writable._write()
구현을 제공합니다.
이 함수는 애플리케이션 코드에서 직접 호출해서는 안 됩니다. 하위 클래스에서 구현해야 하며, 내부 Writable
클래스 메서드에서만 호출해야 합니다.
callback
함수는 쓰기가 성공적으로 완료되었거나 오류와 함께 실패했음을 알리기 위해 writable._write()
내부에서 동기적으로 또는 비동기적으로(즉, 다른 틱) 호출해야 합니다. callback
에 전달되는 첫 번째 인수는 호출이 실패한 경우 Error
객체여야 하고, 쓰기가 성공한 경우 null
이어야 합니다.
writable._write()
가 호출된 시점과 callback
이 호출된 시점 사이에 발생하는 모든 writable.write()
호출은 작성된 데이터를 버퍼링하게 됩니다. callback
이 호출되면 스트림은 'drain'
이벤트를 발생시킬 수 있습니다. 스트림 구현이 한 번에 여러 데이터 청크를 처리할 수 있는 경우 writable._writev()
메서드를 구현해야 합니다.
생성자 옵션에서 decodeStrings
속성이 명시적으로 false
로 설정된 경우 chunk
는 .write()
에 전달된 것과 동일한 객체로 유지되며 Buffer
가 아닌 문자열일 수 있습니다. 이는 특정 문자열 데이터 인코딩에 최적화된 처리를 하는 구현을 지원하기 위한 것입니다. 이 경우 encoding
인수는 문자열의 문자 인코딩을 나타냅니다. 그렇지 않으면 encoding
인수를 안전하게 무시할 수 있습니다.
writable._write()
메서드는 정의하는 클래스 내부용이며 사용자 프로그램에서 직접 호출해서는 안 되므로 밑줄로 시작합니다.
writable._writev(chunks, callback)
chunks
<Object[]> 쓰여질 데이터입니다. 값은 쓰여질 개별 데이터 청크를 나타내는 <Object> 배열입니다. 이 객체의 속성은 다음과 같습니다.callback
<Function> 제공된 청크에 대한 처리가 완료되면 (선택적으로 오류 인수를 사용하여) 호출될 콜백 함수입니다.
이 함수는 애플리케이션 코드에서 직접 호출해서는 안 됩니다. 자식 클래스에서 구현하고 내부 Writable
클래스 메서드에서만 호출해야 합니다.
writable._writev()
메서드는 한 번에 여러 데이터 청크를 처리할 수 있는 스트림 구현에서 writable._write()
에 추가하거나 대안으로 구현할 수 있습니다. 구현되어 있고 이전 쓰기에서 버퍼링된 데이터가 있는 경우 _write()
대신 _writev()
가 호출됩니다.
writable._writev()
메서드는 정의하는 클래스 내부이고 사용자 프로그램에서 직접 호출해서는 안 되므로 밑줄로 접두사가 붙습니다.
writable._destroy(err, callback)
추가된 버전: v8.0.0
err
<Error> 가능한 오류입니다.callback
<Function> 선택적 오류 인수를 사용하는 콜백 함수입니다.
_destroy()
메서드는 writable.destroy()
에 의해 호출됩니다. 자식 클래스에서 재정의할 수 있지만 직접 호출해서는 안 됩니다.
writable._final(callback)
추가된 버전: v8.0.0
callback
<Function> 남은 데이터를 모두 쓰고 완료했을 때 (선택적으로 오류 인수를 포함하여) 이 함수를 호출합니다.
_final()
메서드는 직접 호출해서는 안 됩니다. 하위 클래스에서 구현할 수 있으며, 구현된 경우 내부 Writable
클래스 메서드에 의해서만 호출됩니다.
이 선택적 함수는 스트림이 닫히기 전에 호출되며, callback
이 호출될 때까지 'finish'
이벤트를 지연시킵니다. 이는 스트림이 끝나기 전에 리소스를 닫거나 버퍼링된 데이터를 쓰는 데 유용합니다.
쓰기 중 오류
writable._write()
, writable._writev()
및 writable._final()
메서드 처리 중에 발생하는 오류는 콜백을 호출하고 오류를 첫 번째 인수로 전달하여 전파해야 합니다. 이러한 메서드 내에서 Error
를 던지거나 수동으로 'error'
이벤트를 발생시키면 정의되지 않은 동작이 발생합니다.
Writable
스트림이 오류를 발생시키면 Readable
스트림이 Writable
스트림에 파이프되는 경우 Readable
스트림은 파이프 해제됩니다.
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
},
})
쓰기 가능한 스트림 예시
다음은 매우 단순한 (그리고 다소 무의미한) 사용자 정의 Writable
스트림 구현을 보여줍니다. 이 특정 Writable
스트림 인스턴스는 특별히 유용하지 않지만, 이 예시는 사용자 정의 Writable
스트림 인스턴스의 각 필수 요소를 보여줍니다.
const { Writable } = require('node:stream')
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
}
}
쓰기 가능 스트림에서 버퍼 디코딩하기
버퍼 디코딩은 예를 들어 입력이 문자열인 변환기를 사용할 때와 같이 일반적인 작업입니다. 이는 UTF-8과 같은 멀티바이트 문자 인코딩을 사용할 때 사소한 과정이 아닙니다. 다음 예시는 StringDecoder
및 Writable
을 사용하여 멀티바이트 문자열을 디코딩하는 방법을 보여줍니다.
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')
class StringWritable extends Writable {
constructor(options) {
super(options)
this._decoder = new StringDecoder(options?.defaultEncoding)
this.data = ''
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk)
}
this.data += chunk
callback()
}
_final(callback) {
this.data += this._decoder.end()
callback()
}
}
const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()
w.write('currency: ')
w.write(euro[0])
w.end(euro[1])
console.log(w.data) // currency: €
읽기 가능 스트림 구현하기
stream.Readable
클래스는 Readable
스트림을 구현하기 위해 확장됩니다.
사용자 정의 Readable
스트림은 반드시 new stream.Readable([options])
생성자를 호출하고 readable._read()
메서드를 구현해야 합니다.
new stream.Readable([options])
[기록]
버전 | 변경 사항 |
---|---|
v22.0.0 | 기본 highWaterMark 범프. |
v15.5.0 | AbortSignal 전달 지원. |
v14.0.0 | autoDestroy 옵션 기본값을 true 로 변경. |
v11.2.0, v10.16.0 | 'end' 또는 오류를 발생시킬 때 자동으로 스트림을 destroy() 하는 autoDestroy 옵션 추가. |
options
<Object>highWaterMark
<number> 기본 리소스에서 읽기를 중단하기 전에 내부 버퍼에 저장할 최대 바이트 수. 기본값:65536
(64 KiB) 또는objectMode
스트림의 경우16
.encoding
<string> 지정된 경우 버퍼는 지정된 인코딩을 사용하여 문자열로 디코딩됩니다. 기본값:null
.objectMode
<boolean> 이 스트림이 객체 스트림처럼 작동해야 하는지 여부. 즉,stream.read(n)
은 크기n
의Buffer
대신 단일 값을 반환합니다. 기본값:false
.emitClose
<boolean> 스트림이 파괴된 후'close'
를 내보내야 하는지 여부. 기본값:true
.read
<Function>stream._read()
메서드 구현.destroy
<Function>stream._destroy()
메서드 구현.construct
<Function>stream._construct()
메서드 구현.autoDestroy
<boolean> 이 스트림이 종료된 후 자동으로.destroy()
를 자체적으로 호출해야 하는지 여부. 기본값:true
.signal
<AbortSignal> 가능한 취소를 나타내는 신호.
const { Readable } = require('node:stream')
class MyReadable extends Readable {
constructor(options) {
// stream.Readable(options) 생성자를 호출합니다.
super(options)
// ...
}
}
또는 ES6 이전 스타일의 생성자를 사용하는 경우:
const { Readable } = require('node:stream')
const util = require('node:util')
function MyReadable(options) {
if (!(this instanceof MyReadable)) return new MyReadable(options)
Readable.call(this, options)
}
util.inherits(MyReadable, Readable)
또는 단순화된 생성자 접근 방식을 사용하는 경우:
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
// ...
},
})
전달된 AbortSignal
에 해당하는 AbortController
에서 abort
를 호출하면 생성된 읽기 가능에서 .destroy(new AbortError())
를 호출하는 것과 동일하게 동작합니다.
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
})
// 나중에 스트림을 닫는 작업을 중단합니다.
controller.abort()
readable._construct(callback)
추가된 버전: v15.0.0
callback
<Function> 스트림 초기화가 완료되었을 때 이 함수를 (선택적으로 오류 인수와 함께) 호출합니다.
_construct()
메서드는 직접 호출해서는 안 됩니다. 자식 클래스에서 구현할 수 있으며, 구현하는 경우 내부 Readable
클래스 메서드에서만 호출합니다.
이 선택적 함수는 스트림 생성자에 의해 다음 틱으로 예약되며, callback
이 호출될 때까지 _read()
및 _destroy()
호출을 지연시킵니다. 이는 스트림을 사용하기 전에 상태를 초기화하거나 비동기적으로 리소스를 초기화하는 데 유용합니다.
const { Readable } = require('node:stream')
const fs = require('node:fs')
class ReadStream extends Readable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_read(n) {
const buf = Buffer.alloc(n)
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err)
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
}
})
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
readable._read(size)
추가된 버전: v0.9.4
size
<number> 비동기적으로 읽을 바이트 수
이 함수는 애플리케이션 코드에서 직접 호출해서는 안 됩니다. 자식 클래스에서 구현해야 하며, 내부 Readable
클래스 메서드에서만 호출해야 합니다.
모든 Readable
스트림 구현은 기본 리소스에서 데이터를 가져오기 위해 readable._read()
메서드 구현을 제공해야 합니다.
readable._read()
가 호출되면 리소스에서 데이터를 사용할 수 있는 경우 구현은 this.push(dataChunk)
메서드를 사용하여 해당 데이터를 읽기 큐에 푸시하기 시작해야 합니다. 스트림이 더 많은 데이터를 수락할 준비가 되면 this.push(dataChunk)
를 호출할 때마다 _read()
가 다시 호출됩니다. readable.push()
가 false
를 반환할 때까지 _read()
는 리소스에서 계속 읽고 데이터를 푸시할 수 있습니다. _read()
가 중지된 후 다시 호출될 때만 큐에 추가 데이터를 푸시하기를 재개해야 합니다.
readable._read()
메서드가 호출되면 readable.push()
메서드를 통해 더 많은 데이터가 푸시될 때까지 다시 호출되지 않습니다. 빈 버퍼 및 문자열과 같은 빈 데이터는 readable._read()
가 호출되지 않도록 합니다.
size
인수는 참고용입니다. "읽기"가 데이터를 반환하는 단일 작업인 구현의 경우 size
인수를 사용하여 가져올 데이터 양을 결정할 수 있습니다. 다른 구현은 이 인수를 무시하고 사용 가능해질 때마다 데이터를 제공할 수 있습니다. stream.push(chunk)
를 호출하기 전에 size
바이트를 사용할 수 있을 때까지 "기다릴" 필요는 없습니다.
readable._read()
메서드는 정의하는 클래스 내부이므로 밑줄이 앞에 붙어 있으며 사용자 프로그램에서 직접 호출해서는 안 됩니다.
readable._destroy(err, callback)
추가된 버전: v8.0.0
err
<Error> 가능한 오류입니다.callback
<Function> 선택적 오류 인자를 받는 콜백 함수입니다.
_destroy()
메서드는 readable.destroy()
에 의해 호출됩니다. 자식 클래스에서 오버라이드할 수 있지만 직접 호출해서는 안 됩니다.
readable.push(chunk[, encoding])
[기록]
버전 | 변경 사항 |
---|---|
v22.0.0, v20.13.0 | chunk 인자는 이제 TypedArray 또는 DataView 인스턴스가 될 수 있습니다. |
v8.0.0 | chunk 인자는 이제 Uint8Array 인스턴스가 될 수 있습니다. |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> 읽기 큐에 푸시할 데이터 청크입니다. 객체 모드에서 작동하지 않는 스트림의 경우,chunk
는 <string>, <Buffer>, <TypedArray> 또는 <DataView>여야 합니다. 객체 모드 스트림의 경우chunk
는 임의의 JavaScript 값일 수 있습니다.encoding
<string> 문자열 청크의 인코딩입니다.'utf8'
또는'ascii'
와 같은 유효한Buffer
인코딩이어야 합니다.- 반환: <boolean> 추가 데이터 청크를 계속 푸시할 수 있는 경우
true
, 그렇지 않으면false
입니다.
chunk
가 <Buffer>, <TypedArray>, <DataView> 또는 <string>일 때, 해당 데이터 청크는 스트림 사용자가 소비할 수 있도록 내부 큐에 추가됩니다. chunk
를 null
로 전달하면 스트림의 끝(EOF)을 알리고, 그 이후에는 더 이상 데이터를 쓸 수 없습니다.
Readable
이 일시 중지 모드로 작동 중일 때, readable.push()
로 추가된 데이터는 'readable'
이벤트가 발생하면 readable.read()
메서드를 호출하여 읽을 수 있습니다.
Readable
이 흐름 모드로 작동 중일 때, readable.push()
로 추가된 데이터는 'data'
이벤트를 발생시켜 전달됩니다.
readable.push()
메서드는 최대한 유연하게 설계되었습니다. 예를 들어 일시 중지/재개 메커니즘과 데이터 콜백을 제공하는 하위 수준 소스를 래핑할 때, 사용자 지정 Readable
인스턴스에서 하위 수준 소스를 래핑할 수 있습니다.
// `_source`는 readStop() 및 readStart() 메서드와 데이터가 있을 때 호출되는 `ondata` 멤버 및 데이터가 끝났을 때 호출되는 `onend` 멤버가 있는 객체입니다.
class SourceWrapper extends Readable {
constructor(options) {
super(options)
this._source = getLowLevelSourceObject()
// 데이터가 있을 때마다 내부 버퍼로 푸시합니다.
this._source.ondata = chunk => {
// push()가 false를 반환하면 소스에서 읽기를 중지합니다.
if (!this.push(chunk)) this._source.readStop()
}
// 소스가 끝나면 EOF 신호를 보내는 `null` 청크를 푸시합니다.
this._source.onend = () => {
this.push(null)
}
}
// 스트림에서 더 많은 데이터를 가져오고 싶을 때 _read()가 호출됩니다.
// 이 경우 권고 크기 인자는 무시됩니다.
_read(size) {
this._source.readStart()
}
}
readable.push()
메서드는 내용을 내부 버퍼로 푸시하는 데 사용됩니다. readable._read()
메서드에 의해 구동될 수 있습니다.
객체 모드에서 작동하지 않는 스트림의 경우, readable.push()
의 chunk
매개변수가 undefined
이면 빈 문자열 또는 버퍼로 처리됩니다. 자세한 내용은 readable.push('')
를 참조하십시오.
읽기 중 오류
readable._read()
처리 중에 발생하는 오류는 readable.destroy(err)
메서드를 통해 전파되어야 합니다. readable._read()
내에서 Error
를 throw하거나 'error'
이벤트를 수동으로 발생시키면 정의되지 않은 동작이 발생합니다.
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition()
if (err) {
this.destroy(err)
} else {
// Do some work.
}
},
})
카운팅 스트림 예제
다음은 1부터 1,000,000까지 숫자를 오름차순으로 내보낸 다음 끝나는 Readable
스트림의 기본 예제입니다.
const { Readable } = require('node:stream')
class Counter extends Readable {
constructor(opt) {
super(opt)
this._max = 1000000
this._index = 1
}
_read() {
const i = this._index++
if (i > this._max) this.push(null)
else {
const str = String(i)
const buf = Buffer.from(str, 'ascii')
this.push(buf)
}
}
}
듀플렉스 스트림 구현
Duplex
스트림은 TCP 소켓 연결과 같이 Readable
와 Writable
을 모두 구현하는 스트림입니다.
JavaScript는 다중 상속을 지원하지 않으므로 stream.Duplex
클래스는 stream.Readable
및 stream.Writable
클래스를 확장하는 대신 Duplex
스트림을 구현하기 위해 확장됩니다.
stream.Duplex
클래스는 원형적으로 stream.Readable
에서 상속하고 기생적으로 stream.Writable
에서 상속하지만 instanceof
는 stream.Writable
에서 Symbol.hasInstance
를 재정의하여 두 기본 클래스에 대해 올바르게 작동합니다.
사용자 지정 Duplex
스트림은 new stream.Duplex([options])
생성자를 호출하고 readable._read()
및 writable._write()
메서드 둘 다를 구현해야 합니다.
new stream.Duplex(options)
[History]
버전 | 변경 사항 |
---|---|
v8.4.0 | readableHighWaterMark 및 writableHighWaterMark 옵션이 이제 지원됩니다. |
options
<Object>Writable
및Readable
생성자에 모두 전달됩니다. 또한 다음 필드가 있습니다.allowHalfOpen
<boolean>false
로 설정하면 읽기 가능한 쪽이 끝나면 스트림이 쓰기 가능한 쪽을 자동으로 종료합니다. 기본값:true
.readable
<boolean>Duplex
가 읽기 가능해야 하는지 설정합니다. 기본값:true
.writable
<boolean>Duplex
가 쓰기 가능해야 하는지 설정합니다. 기본값:true
.readableObjectMode
<boolean> 스트림의 읽기 가능한 쪽에 대한objectMode
를 설정합니다.objectMode
가true
이면 효과가 없습니다. 기본값:false
.writableObjectMode
<boolean> 스트림의 쓰기 가능한 쪽에 대한objectMode
를 설정합니다.objectMode
가true
이면 효과가 없습니다. 기본값:false
.readableHighWaterMark
<number> 스트림의 읽기 가능한 쪽에 대한highWaterMark
를 설정합니다.highWaterMark
가 제공된 경우 효과가 없습니다.writableHighWaterMark
<number> 스트림의 쓰기 가능한 쪽에 대한highWaterMark
를 설정합니다.highWaterMark
가 제공된 경우 효과가 없습니다.
const { Duplex } = require('node:stream')
class MyDuplex extends Duplex {
constructor(options) {
super(options)
// ...
}
}
또는 pre-ES6 스타일 생성자를 사용할 때:
const { Duplex } = require('node:stream')
const util = require('node:util')
function MyDuplex(options) {
if (!(this instanceof MyDuplex)) return new MyDuplex(options)
Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)
또는 단순화된 생성자 접근 방식을 사용하여:
const { Duplex } = require('node:stream')
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
})
파이프라인을 사용할 때:
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')
pipeline(
fs.createReadStream('object.json').setEncoding('utf8'),
new Transform({
decodeStrings: false, // 버퍼 대신 문자열 입력을 허용합니다.
construct(callback) {
this.data = ''
callback()
},
transform(chunk, encoding, callback) {
this.data += chunk
callback()
},
flush(callback) {
try {
// 유효한 json인지 확인합니다.
JSON.parse(this.data)
this.push(this.data)
callback()
} catch (err) {
callback(err)
}
},
}),
fs.createWriteStream('valid-object.json'),
err => {
if (err) {
console.error('실패', err)
} else {
console.log('완료됨')
}
}
)
이중 스트림 예시
다음은 데이터가 쓰여질 수 있고 데이터가 읽혀질 수 있는 가상 하위 레벨 소스 객체를 래핑하는 간단한 Duplex
스트림의 예시입니다. 단, API가 Node.js 스트림과 호환되지 않습니다. 다음은 들어오는 쓰기 데이터를 Writable
인터페이스를 통해 버퍼링하고 Readable
인터페이스를 통해 다시 읽는 간단한 Duplex
스트림의 예시입니다.
const { Duplex } = require('node:stream')
const kSource = Symbol('source')
class MyDuplex extends Duplex {
constructor(source, options) {
super(options)
this[kSource] = source
}
_write(chunk, encoding, callback) {
// 기본 소스는 문자열만 처리합니다.
if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
this[kSource].writeSomeData(chunk)
callback()
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding))
})
}
}
Duplex
스트림의 가장 중요한 측면은 Readable
및 Writable
측면이 단일 객체 인스턴스 내에 공존함에도 불구하고 서로 독립적으로 작동한다는 것입니다.
객체 모드 이중 스트림
Duplex
스트림의 경우 objectMode
는 각각 readableObjectMode
및 writableObjectMode
옵션을 사용하여 Readable
또는 Writable
측면 중 하나에만 단독으로 설정할 수 있습니다.
예를 들어 다음 예제에서는 Readable
측면에서 16진수 문자열로 변환되는 JavaScript 숫자를 허용하는 객체 모드 Writable
측면이 있는 새 Transform
스트림(일종의 Duplex
스트림)이 생성됩니다.
const { Transform } = require('node:stream')
// 모든 Transform 스트림은 Duplex 스트림이기도 합니다.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// 필요한 경우 청크를 숫자로 강제 변환합니다.
chunk |= 0
// 청크를 다른 것으로 변환합니다.
const data = chunk.toString(16)
// 데이터를 읽기 가능한 대기열로 푸시합니다.
callback(null, '0'.repeat(data.length % 2) + data)
},
})
myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))
myTransform.write(1)
// 출력: 01
myTransform.write(10)
// 출력: 0a
myTransform.write(100)
// 출력: 64
변환 스트림 구현
Transform
스트림은 입력으로부터 출력이 어떤 방식으로 계산되는 Duplex
스트림입니다. 예시로는 데이터를 압축, 암호화 또는 해독하는 zlib 스트림이나 crypto 스트림이 있습니다.
출력 크기가 입력 크기와 같거나, 동일한 개수의 청크이거나, 동일한 시간에 도착해야 한다는 요구 사항은 없습니다. 예를 들어, Hash
스트림은 입력이 종료될 때 제공되는 단일 출력 청크만 가집니다. zlib
스트림은 입력보다 훨씬 작거나 훨씬 큰 출력을 생성합니다.
stream.Transform
클래스는 Transform
스트림을 구현하기 위해 확장됩니다.
stream.Transform
클래스는 stream.Duplex
에서 프로토타입으로 상속받고 writable._write()
및 readable._read()
메서드의 자체 버전을 구현합니다. 사용자 정의 Transform
구현은 transform._transform()
메서드를 반드시 구현해야 하고, transform._flush()
메서드도 구현할 수 있습니다.
스트림에 쓰여진 데이터가 Readable
측의 출력이 소비되지 않으면 스트림의 Writable
측이 일시 중지될 수 있으므로 Transform
스트림을 사용할 때 주의해야 합니다.
new stream.Transform([options])
options
<Object>Writable
및Readable
생성자 모두에 전달됩니다. 다음과 같은 필드도 있습니다.transform
<Function>stream._transform()
메서드에 대한 구현입니다.flush
<Function>stream._flush()
메서드에 대한 구현입니다.
const { Transform } = require('node:stream')
class MyTransform extends Transform {
constructor(options) {
super(options)
// ...
}
}
또는 pre-ES6 스타일 생성자를 사용하는 경우:
const { Transform } = require('node:stream')
const util = require('node:util')
function MyTransform(options) {
if (!(this instanceof MyTransform)) return new MyTransform(options)
Transform.call(this, options)
}
util.inherits(MyTransform, Transform)
또는 단순화된 생성자 접근 방식을 사용하는 경우:
const { Transform } = require('node:stream')
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
})
이벤트: 'end'
'end'
이벤트는 stream.Readable
클래스에서 발생합니다. 'end'
이벤트는 모든 데이터가 출력된 후, 즉 transform._flush()
의 콜백이 호출된 후에 발생합니다. 오류가 발생한 경우에는 'end'
가 발생하지 않아야 합니다.
이벤트: 'finish'
'finish'
이벤트는 stream.Writable
클래스에서 발생합니다. 'finish'
이벤트는 stream.end()
가 호출되고 모든 청크가 stream._transform()
에 의해 처리된 후에 발생합니다. 오류가 발생한 경우에는 'finish'
가 발생하지 않아야 합니다.
transform._flush(callback)
callback
<Function> 남은 데이터가 플러시될 때 호출할 콜백 함수입니다 (선택적으로 오류 인수와 데이터 포함).
이 함수는 응용 프로그램 코드에서 직접 호출해서는 안 됩니다. 자식 클래스에서 구현해야 하며, 내부 Readable
클래스 메서드에서만 호출해야 합니다.
경우에 따라 변환 작업은 스트림 끝에 추가 데이터를 내보내야 할 수 있습니다. 예를 들어 zlib
압축 스트림은 출력을 최적으로 압축하는 데 사용되는 내부 상태의 양을 저장합니다. 그러나 스트림이 종료되면 압축된 데이터가 완전하도록 추가 데이터를 플러시해야 합니다.
사용자 정의 Transform
구현에서는 transform._flush()
메서드를 구현할 수 있습니다. 이는 더 이상 소비할 쓰여진 데이터가 없지만 'end'
이벤트가 Readable
스트림의 끝을 알리기 전에 호출됩니다.
transform._flush()
구현 내에서 transform.push()
메서드를 적절하게 0회 이상 호출할 수 있습니다. 플러시 작업이 완료되면 callback
함수를 호출해야 합니다.
transform._flush()
메서드는 클래스 내부에서 사용되므로 밑줄로 접두사가 붙어 있으며 사용자 프로그램에서 직접 호출해서는 안 됩니다.
transform._transform(chunk, encoding, callback)
chunk
<Buffer> | <string> | <any> 변환될Buffer
로,stream.write()
에 전달된string
에서 변환됩니다. 스트림의decodeStrings
옵션이false
이거나 스트림이 객체 모드에서 작동하는 경우, 청크는 변환되지 않으며stream.write()
에 전달된 값 그대로 유지됩니다.encoding
<string> 청크가 문자열이면 이것은 인코딩 유형입니다. 청크가 버퍼이면 이것은 특수 값'buffer'
입니다. 이 경우 무시하세요.callback
<Function> 제공된chunk
가 처리된 후 호출될 콜백 함수(선택적으로 오류 인수 및 데이터 포함).
이 함수는 응용 프로그램 코드에서 직접 호출해서는 안 됩니다. 하위 클래스에서 구현해야 하며, 내부 Readable
클래스 메서드에서만 호출해야 합니다.
모든 Transform
스트림 구현은 입력을 받아들이고 출력을 생성하는 _transform()
메서드를 제공해야 합니다. transform._transform()
구현은 작성 중인 바이트를 처리하고 출력을 계산한 다음 transform.push()
메서드를 사용하여 읽기 가능한 부분으로 해당 출력을 전달합니다.
transform.push()
메서드는 청크의 결과로 출력할 양에 따라 단일 입력 청크에서 출력을 생성하기 위해 0번 이상 호출될 수 있습니다.
특정 입력 데이터 청크에서 출력이 생성되지 않을 수도 있습니다.
callback
함수는 현재 청크가 완전히 소비된 경우에만 호출해야 합니다. callback
에 전달되는 첫 번째 인수는 입력 처리 중에 오류가 발생한 경우 Error
객체여야 하고, 그렇지 않으면 null
이어야 합니다. 두 번째 인수가 callback
에 전달되면 transform.push()
메서드로 전달되지만, 첫 번째 인수가 falsy인 경우에만 전달됩니다. 즉, 다음은 동일합니다.
transform.prototype._transform = function (data, encoding, callback) {
this.push(data)
callback()
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data)
}
transform._transform()
메서드는 밑줄로 시작하는데, 이는 해당 메서드를 정의하는 클래스 내부의 메서드이며, 사용자 프로그램에서 직접 호출해서는 안 되기 때문입니다.
transform._transform()
은 병렬로 호출되지 않습니다. 스트림은 대기열 메커니즘을 구현하며, 다음 청크를 받으려면 callback
을 동기적으로 또는 비동기적으로 호출해야 합니다.
클래스: stream.PassThrough
stream.PassThrough
클래스는 입력 바이트를 출력으로 그대로 전달하는 Transform
스트림의 간단한 구현입니다. 주로 예시 및 테스트용이지만, stream.PassThrough
가 새로운 종류의 스트림을 위한 빌딩 블록으로 유용한 일부 사용 사례도 있습니다.
추가 정보
비동기 생성기 및 비동기 반복자와의 스트림 호환성
JavaScript에서 비동기 생성기와 반복자가 지원됨에 따라, 비동기 생성기는 현재 시점에서 사실상 최상위 수준의 언어 수준 스트림 구성 요소입니다.
아래에는 비동기 생성기 및 비동기 반복자와 함께 Node.js 스트림을 사용하는 몇 가지 일반적인 상호 운용 사례가 제공됩니다.
비동기 반복자를 사용하여 읽기 가능 스트림 소비
;(async function () {
for await (const chunk of readable) {
console.log(chunk)
}
})()
비동기 반복자는 처리되지 않은 사후 소멸 오류를 방지하기 위해 스트림에 영구 오류 핸들러를 등록합니다.
비동기 생성기를 사용하여 읽기 가능 스트림 생성
Node.js 읽기 가능 스트림은 Readable.from()
유틸리티 메서드를 사용하여 비동기 생성기로부터 생성할 수 있습니다.
const { Readable } = require('node:stream')
const ac = new AbortController()
const signal = ac.signal
async function* generate() {
yield 'a'
await someLongRunningFn({ signal })
yield 'b'
yield 'c'
}
const readable = Readable.from(generate())
readable.on('close', () => {
ac.abort()
})
readable.on('data', chunk => {
console.log(chunk)
})
비동기 반복자에서 쓰기 가능 스트림으로 파이핑
비동기 반복자에서 쓰기 가능 스트림으로 쓸 때는 백프레셔 및 오류를 올바르게 처리해야 합니다. stream.pipeline()
는 백프레셔 및 백프레셔 관련 오류의 처리를 추상화합니다.
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')
const writable = fs.createWriteStream('./file')
const ac = new AbortController()
const signal = ac.signal
const iterator = createIterator({ signal })
// 콜백 패턴
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err)
} else {
console.log(value, 'value returned')
}
}).on('close', () => {
ac.abort()
})
// 프로미스 패턴
pipelinePromise(iterator, writable)
.then(value => {
console.log(value, 'value returned')
})
.catch(err => {
console.error(err)
ac.abort()
})
이전 Node.js 버전과의 호환성
Node.js 0.10 이전에는 Readable
스트림 인터페이스가 더 단순했지만, 기능이 부족하고 활용도가 떨어졌습니다.
stream.read()
메서드 호출을 기다리는 대신,'data'
이벤트가 즉시 발생하기 시작했습니다. 데이터를 처리하는 방법을 결정하기 위해 어느 정도 작업을 수행해야 하는 애플리케이션은 데이터가 손실되지 않도록 읽은 데이터를 버퍼에 저장해야 했습니다.stream.pause()
메서드는 보장된 것이 아니라 권고 사항이었습니다. 이는 스트림이 일시 중지된 상태일 때조차도'data'
이벤트를 수신할 준비를 해야 함을 의미했습니다.
Node.js 0.10에서 Readable
클래스가 추가되었습니다. 이전 Node.js 프로그램과의 하위 호환성을 위해 Readable
스트림은 'data'
이벤트 핸들러가 추가되거나 stream.resume()
메서드가 호출될 때 "흐름 모드"로 전환됩니다. 그 효과는 새로운 stream.read()
메서드와 'readable'
이벤트를 사용하지 않더라도 더 이상 'data'
청크 손실에 대해 걱정할 필요가 없다는 것입니다.
대부분의 애플리케이션은 계속 정상적으로 작동하지만, 다음과 같은 조건에서 특수한 상황이 발생합니다.
'data'
이벤트 리스너가 추가되지 않음.stream.resume()
메서드가 호출되지 않음.- 스트림이 쓰기 가능한 대상으로 파이프되지 않음.
예를 들어 다음 코드를 생각해 보십시오.
// 경고! 깨졌음!
net
.createServer(socket => {
// 'end' 리스너를 추가하지만 데이터를 소비하지 않음.
socket.on('end', () => {
// 여기에는 절대 도달하지 않음.
socket.end('메시지를 받았지만 처리되지 않았습니다.\n')
})
})
.listen(1337)
Node.js 0.10 이전에는 들어오는 메시지 데이터가 단순히 폐기되었습니다. 그러나 Node.js 0.10 이상에서는 소켓이 영원히 일시 중지된 상태로 유지됩니다.
이러한 상황에서의 해결 방법은 stream.resume()
메서드를 호출하여 데이터 흐름을 시작하는 것입니다.
// 해결 방법.
net
.createServer(socket => {
socket.on('end', () => {
socket.end('메시지를 받았지만 처리되지 않았습니다.\n')
})
// 데이터 흐름을 시작하고 폐기합니다.
socket.resume()
})
.listen(1337)
흐름 모드로 전환되는 새로운 Readable
스트림 외에도, 0.10 이전 스타일의 스트림은 readable.wrap()
메서드를 사용하여 Readable
클래스로 래핑할 수 있습니다.
readable.read(0)
실제로 데이터를 소비하지 않고 기본 읽기 가능 스트림 메커니즘의 새로 고침을 트리거해야 하는 경우가 있습니다. 이러한 경우, 항상 null
을 반환하는 readable.read(0)
을 호출할 수 있습니다.
내부 읽기 버퍼가 highWaterMark
보다 낮고 스트림이 현재 읽고 있지 않은 경우, stream.read(0)
을 호출하면 하위 수준의 stream._read()
호출이 트리거됩니다.
대부분의 애플리케이션에서는 이를 거의 수행할 필요가 없지만, 특히 Readable
스트림 클래스 내부에서 Node.js 내에서 이 작업이 수행되는 경우가 있습니다.
readable.push('')
readable.push('')
사용은 권장되지 않습니다.
객체 모드가 아닌 스트림에 0바이트 <string>, <Buffer>, <TypedArray> 또는 <DataView>를 푸시하면 흥미로운 부작용이 발생합니다. 이는 readable.push()
에 대한 호출이기 때문에 호출은 읽기 프로세스를 종료합니다. 그러나 인수가 빈 문자열이기 때문에 읽기 가능 버퍼에 데이터가 추가되지 않아 사용자가 소비할 것이 없습니다.
readable.setEncoding()
호출 후 highWaterMark
불일치
readable.setEncoding()
을 사용하면 객체 모드가 아닌 경우 highWaterMark
가 작동하는 방식이 변경됩니다.
일반적으로 현재 버퍼의 크기는 바이트 단위로 highWaterMark
와 비교됩니다. 그러나 setEncoding()
이 호출된 후 비교 함수는 버퍼의 크기를 문자 단위로 측정하기 시작합니다.
latin1
또는 ascii
를 사용하는 일반적인 경우에는 문제가 되지 않습니다. 그러나 멀티바이트 문자를 포함할 수 있는 문자열로 작업할 때는 이 동작에 유의하는 것이 좋습니다.