当前位置:   article > 正文

聊聊Nodejs中的核心模块:stream流模块(看看如何使用)_nodejs stream

nodejs stream

本篇文章带大家详细理解一下Nodejs中的stream流模块,介绍一下stream流概念及用法,希望对大家有所帮助!

stream流模块,是Node中非常核心的一个模块,其它模块如fs、http等都基于流stream模块的实例。

而对于大多前端小白在刚入门Node的学习过程中,对于流的概念及使用还是不太好清晰的理解,因为在前端的工作中似乎很少有过关于"流"处理相关的应用。

1. 流,是什么?

单纯“流”这个字,我们很容易产生水流,流动等的概念。

官方定义:流,是用于在 Node.js 中处理流数据的抽象接口

从官方的定义中,我们可以看出:

  • 流,是Node提供的一种处理数据的工具
  • 流,是Node中的一种抽象接口

准确的理解,流,可以理解为数据流,它是一种用来传输数据的手段,在一个应用程序中,流,是一种有序的,有起点和终点的数据流。

造成我们对stream流不太好的理解的主要原因就是,它是一种抽象的概念。

2. 流,的具体使用场景

为了让我们能够清楚的理解stream模块,我们首先来以具体的应用场景来说明stream模块有哪些实际应用之处。

stream流,在Node中主要应用在大量数据处理的需求上,如fs对大文件的读取和写入、http请求响应、文件的压缩、数据的加密/解密等应用。

我们以上面的图片说明流的使用,水桶可以理解为数据源,水池可以理解为数据目标,中间连接的管道,我们可以理解为数据流,通过数据流管道,数据从数据源流向数据目标。

3. 流的分类

在Node中,流被分为4类:可读流,可写流,双工流,转换流

  • Writable: 可以写入数据的流
  • Readable: 可以从中读取数据的流
  • DuplexReadable 和 Writable 的流
  • Transform: 可以在写入和读取数据时修改或转换数据的 Duplex 流

所有的流都是 EventEmitter 的实例。即我们可以通过事件机制监听数据流的变化。

4. 数据模式和缓存区

在深入学习4类流的具体使用之前,我们需要理解两个概念数据模式缓存区,有助于我们在接下来流的学习中更好的理解。

4.1 数据模式

Node.js API 创建的所有流都只对字符串和 Buffer(或 Uint8Array)对象进行操作。

4.2 缓存区

Writable和 Readable 流都将数据存储在内部缓冲区(buffer)中。

可缓冲的数据量取决于传给流的构造函数的 highWaterMark 选项, 对于普通的流,highWaterMark 选项指定字节的总数;对于在对象模式下操作的流,highWaterMark选项指定对象的总数。

highWaterMark 选项是阈值,而不是限制:它规定了流在停止请求更多数据之前缓冲的数据量。

当实现调用 stream.push(chunk) 时,数据缓存在 Readable 流中。 如果流的消费者没有调用 stream.read(),则数据会一直驻留在内部队列中,直到被消费。

一旦内部读取缓冲区的总大小达到 highWaterMark 指定的阈值,则流将暂时停止从底层资源读取数据,直到可以消费当前缓冲的数据

当重复调用 writable.write(chunk) 方法时,数据会缓存在 Writable 流中。

5. 可读流

5.1 流读取的流动与暂停

Readable 流以两种模式之一有效地运行:流动和暂停。

  • 流动模式:从系统底层读取数据并push()到缓存区,达到highWaterMark后 push() 会返回 false,资源停止流向缓存区,并触发data事件消费数据。

  • 暂停模式:所有的Readable流都是以Paused暂停模式开始,必须显式调用stream.read()方法来从流中读取数据。每一次数据达到缓存区都会触发一次 readable 事件,也就是每一次 push() 都会触发 readable。

  • 暂停模式切换到流动模式的方式:

    • 添加data事件句柄
    • 调用stream.resume()方法
    • 调用stream.pipe()方法将数据发送到 Writable
  • 流动模式切换到暂停模式的方式:

    • 如果没有管道目标,则通过调用 stream.pause() 方法。
    • 如果有管道目标,则删除所有管道目标。 可以通过调用 stream.unpipe()方法删除多个管道目标。

5.2 可读流常用示例

  1. import path from 'path';
  2. import fs, { read } from 'fs';
  3. const filePath = path.join(path.resolve(), 'files''text.txt');
  4. const readable = fs.createReadStream(filePath);
  5. // 如果使用 readable.setEncoding() 方法为流指定了默认编码,则监听器回调将把数据块作为字符串传入;否则数据将作为 Buffer 传入。
  6. readable.setEncoding('utf8');
  7. let str = '';
  8. readable.on('open', (fd=> {
  9.   console.log('开始读取文件')
  10. })
  11. // 每当流将数据块的所有权移交给消费者时,则会触发 'data' 事件
  12. readable.on('data', (data=> {
  13.   str += data;
  14.   console.log('读取到数据')
  15. })
  16. // 方法将导致处于流动模式的流停止触发 'data' 事件,切换到暂停模式。 任何可用的数据都将保留在内部缓冲区中。
  17. readable.pause();
  18. // 方法使被显式暂停的 Readable 流恢复触发 'data' 事件,将流切换到流动模式。
  19. readable.resume();
  20. // 当调用 stream.pause() 并且 readableFlowing 不是 false 时,则会触发 'pause' 事件。
  21. readable.on('pause', () => {
  22.   console.log('读取暂停')
  23. })
  24. // 当调用 stream.resume() 并且 readableFlowing 不是 true 时,则会触发 'resume' 事件。
  25. readable.on('resume', () => {
  26.   console.log('重新流动')
  27. })
  28. // 当流中没有更多数据可供消费时,则会触发 'end' 事件。
  29. readable.on('end', () => {
  30.   console.log('文件读取完毕');
  31. })
  32. // 当流及其任何底层资源(例如文件描述符)已关闭时,则会触发 'close' 事件。
  33. readable.on('close', () => {
  34.   console.log('关闭文件读取')
  35. })
  36. // 将 destWritable 流绑定到 readable,使其自动切换到流动模式并将其所有数据推送到绑定的 Writable。 数据流将被自动管理
  37. readable.pipe(destWriteable)
  38. // 如果底层流由于底层内部故障而无法生成数据,或者当流实现尝试推送无效数据块时,可能会发生这种情况。
  39. readable.on('error', (err) => {
  40.   console.log(err)
  41.   console.log('文件读取发生错误')
  42. })

6. 可写流

6.1 可写流的流动与暂停

writeable流 与 readable流 是比较相似的,数据流过来的时候,会直接写入到缓存区,当写入速度比较缓慢或者写入暂停时,数据流会在缓存区缓存起来;

当生产者写入速度过快,把队列池装满了之后,就会出现「背压」,这个时候是需要告诉生产者暂停生产的,当队列释放之后,writable流 会给生产者发送一个 drain 消息,让它恢复生产。

6.2 可写流示例

  1. import path from 'path';
  2. import fs, { read } from 'fs';
  3. const filePath = path.join(path.resolve(), 'files''text.txt');
  4. const copyFile = path.join(path.resolve(), 'files''copy.txt');
  5. let str = '';
  6. // 创建可读流
  7. const readable = fs.createReadStream(filePath);
  8. // 如果使用 readable.setEncoding() 方法为流指定了默认编码
  9. readable.setEncoding('utf8');
  10. // 创建可写流
  11. const wirteable = fs.createWriteStream(copyFile);
  12. // 编码
  13. wirteable.setDefaultEncoding('utf8');
  14. readable.on('open', (fd=> {
  15.   console.log('开始读取文件')
  16. })
  17. // 每当流将数据块的所有权移交给消费者时,则会触发 'data' 事件
  18. readable.on('data', (data=> {
  19.   str += data;
  20.   console.log('读取到数据');
  21.   // 写入
  22.   wirteable.write(data'utf8');
  23. })
  24. wirteable.on('open', () => {
  25.   console.log('开始写入数据')
  26. })
  27. // 如果对 stream.write(chunk) 的调用返回 false,则 'drain' 事件将在适合继续将数据写入流时触发。
  28. // 即生产数据的速度大于写入速度,缓存区装满之后,会暂停生产着从底层读取数据
  29. // writeable缓存区释放之后,会发送一个drain事件让生产者继续读取
  30. wirteable.on('drain', () => {
  31.   console.log('继续写入')
  32. })
  33. // 在调用 stream.end() 方法之后,并且所有数据都已刷新到底层系统,则触发 'finish' 事件。
  34. wirteable.on('finish', () => {
  35.   console.log('数据写入完毕')
  36. })
  37. readable.on('end', () => {
  38.   // 数据读取完毕通知可写流
  39.   wirteable.end()
  40. })
  41. // 当在可读流上调用 stream.pipe() 方法将此可写流添加到其目标集时,则触发 'pipe' 事件。
  42. // readable.pipe(destWriteable)
  43. wirteable.on('pipe', () => {
  44.   console.log('管道流创建')
  45. })
  46. wirteable.on('error', () => {
  47.   console.log('数据写入发生错误')
  48. })

更多node相关知识,请访问:nodejs 教程!!

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号