深入了解Node.js中的4种 stream

本篇文章带大家了解一下node.js中的4种 stream,看看怎么解决爆缓冲区的“背压”问题,有需要的朋友可以去学习了解一下~

深入了解Node.js中的4种 stream

把一个东西从 A 搬到 B 该怎么搬呢?

抬起来,移动到目的地,放下不就行了么。

那如果这个东西有一吨重呢?

那就一部分一部分的搬。

其实 IO 也就是搬东西,包括网络的 IO、文件的 IO,如果数据量少,那么直接传送全部内容就行了,但如果内容特别多,一次性加载到内存会崩溃,而且速度也慢,这时候就可以一部分一部分的处理,这就是流的思想。【推荐学习:《nodejs 教程》】

各种语言基本都实现了 stream 的 api,Node.js 也是,stream api 是比较常用的,下面我们就来探究一下 stream。

本文会回答以下问题:

Node.js 的 4 种 stream 是什么生成器如何与 Readable Stream 结合stream 的暂停和流动什么是背压问题,如何解决

Node.js 的 4种 stream

流的直观感受

从一个地方流到另一个地方,显然有流出的一方和流入的一方,流出的一方就是可读流(readable),而流入的一方就是可写流(writable)。

1.png

当然,也有的流既可以流入又可以流出,这种叫做双工流(duplex)

2.png

既然可以流入又可以流出,那么是不是可以对流入的内容做下转换再流出呢,这种流叫做转换流(transform)

3.png

duplex 流的流入和流出内容不需要相关,而 transform 流的流入和流出是相关的,这是两者的区别。

流的 api

Node.js 提供的 stream 就是上面介绍的那 4 种:

const stream = require('stream');// 可读流const Readable = stream.Readable;// 可写流const Writable = stream.Writable;// 双工流const Duplex = stream.Duplex;// 转换流const Transform = stream.Transform;

登录后复制

它们都有要实现的方法:

Readable 需要实现 _read 方法来返回内容Writable 需要实现 _write 方法来接受内容Duplex 需要实现 _read 和 _write 方法来接受和返回内容Transform 需要实现 _transform 方法来把接受的内容转换之后返回

我们分别来看一下:

Readable

Readable 要实现 _read 方法,通过 push 返回具体的数据。

const Stream = require('stream');const readableStream = Stream.Readable();readableStream._read = function() {    this.push('阿门阿前一棵葡萄树,');    this.push('阿东阿东绿的刚发芽,');    this.push('阿东背着那重重的的壳呀,');    this.push('一步一步地往上爬。')    this.push(null);}readableStream.on('data', (data)=> {    console.log(data.toString())});readableStream.on('end', () => {    console.log('done~');});

登录后复制

当 push 一个 null 时,就代表结束流。

执行效果如下:

4.png

创建 Readable 也可以通过继承的方式:

const Stream = require('stream');class ReadableDong extends Stream.Readable {    constructor() {        super();    }    _read() {        this.push('阿门阿前一棵葡萄树,');        this.push('阿东阿东绿的刚发芽,');        this.push('阿东背着那重重的的壳呀,');        this.push('一步一步地往上爬。')        this.push(null);    }}const readableStream = new ReadableDong();readableStream.on('data', (data)=> {    console.log(data.toString())});readableStream.on('end', () => {    console.log('done~');});

登录后复制

可读流是生成内容的,那么很自然可以和生成器结合:

const Stream = require('stream');class ReadableDong extends Stream.Readable {    constructor(iterator) {        super();        this.iterator = iterator;    }    _read() {        const next = this.iterator.next();        if(next.done) {            return this.push(null);        } else {            this.push(next.value)        }    }}function *songGenerator() {    yield '阿门阿前一棵葡萄树,';    yield '阿东阿东绿的刚发芽,';    yield '阿东背着那重重的的壳呀,';    yield '一步一步地往上爬。';}const songIterator = songGenerator();const readableStream = new ReadableDong(songIterator);readableStream.on('data', (data)=> {    console.log(data.toString())});readableStream.on('end', () => {    console.log('done~');});

登录后复制

这就是可读流,通过实现 _read 方法来返回内容。

Writable

Writable 要实现 _write 方法,接收写入的内容。

const Stream = require('stream');const writableStream = Stream.Writable();writableStream._write = function (data, enc, next) {   console.log(data.toString());   // 每秒写一次   setTimeout(() => {       next();   }, 1000);}writableStream.on('finish', () => console.log('done~'));writableStream.write('阿门阿前一棵葡萄树,');writableStream.write('阿东阿东绿的刚发芽,');writableStream.write('阿东背着那重重的的壳呀,');writableStream.write('一步一步地往上爬。');writableStream.end();

登录后复制

接收写入的内容,打印出来,并且调用 next 来处理下一个写入的内容,这里调用 next 是异步的,可以控制频率。

跑了一下,确实可以正常的处理写入的内容:

5.png

这就是可写流,通过实现 _write 方法来处理写入的内容。

Duplex

Duplex 是可读可写,同时实现 _read 和 _write 就可以了

const Stream = require('stream');var duplexStream = Stream.Duplex();duplexStream._read = function () {    this.push('阿门阿前一棵葡萄树,');    this.push('阿东阿东绿的刚发芽,');    this.push('阿东背着那重重的的壳呀,');    this.push('一步一步地往上爬。')    this.push(null);}duplexStream._write = function (data, enc, next) {    console.log(data.toString());    next();}duplexStream.on('data', data => console.log(data.toString()));duplexStream.on('end', data => console.log('read done~'));duplexStream.write('阿门阿前一棵葡萄树,');duplexStream.write('阿东阿东绿的刚发芽,');duplexStream.write('阿东背着那重重的的壳呀,');duplexStream.write('一步一步地往上爬。');duplexStream.end();duplexStream.on('finish', data => console.log('write done~'));

登录后复制

整合了 Readable 流和 Writable 流的功能,这就是双工流 Duplex。

6.png

Transform

Duplex 流虽然可读可写,但是两者之间没啥关联,而有的时候需要对流入的内容做转换之后流出,这时候就需要转换流 Transform。

Transform 流要实现 _transform 的 api,我们实现下对内容做反转的转换流:

const Stream = require('stream');class TransformReverse extends Stream.Transform {  constructor() {    super()  }  _transform(buf, enc, next) {    const res = buf.toString().split('').reverse().join('');    this.push(res)    next()  }}var transformStream = new TransformReverse();transformStream.on('data', data => console.log(data.toString()))transformStream.on('end', data => console.log('read done~'));transformStream.write('阿门阿前一棵葡萄树');transformStream.write('阿东阿东绿的刚发芽');transformStream.write('阿东背着那重重的的壳呀');transformStream.write('一步一步地往上爬');transformStream.end()transformStream.on('finish', data => console.log('write done~'));

登录后复制

跑了一下,效果如下:

7.png

流的暂停和流动

我们从 Readable 流中获取内容,然后流入 Writable 流,两边分别做 _read 和 _write 的实现,就实现了流动。

8.png

背压

但是 read 和 write 都是异步的,如果两者速率不一致呢?

如果 Readable 读入数据的速率大于 Writable 写入速度的速率,这样就会积累一些数据在缓冲区,如果缓冲的数据过多,就会爆掉,会丢失数据。

而如果  Readable 读入数据的速率小于 Writable 写入速度的速率呢?那没关系,最多就是中间有段空闲时期。

这种读入速率大于写入速率的现象叫做“背压”,或者“负压”。也很好理解,写入段压力比较大,写不进去了,会爆缓冲区,导致数据丢失。

这个缓冲区大小可以通过 readableHighWaterMark 和 writableHightWaterMark 来查看,是 16k。

9.png

解决背压

怎么解决这种读写速率不一致的问题呢?

当没写完的时候,暂停读就行了。这样就不会读入的数据越来越多,驻留在缓冲区。

readable stream 有个 readableFlowing 的属性,代表是否自动读入数据,默认为 true,也就是自动读入数据,然后监听 data 事件就可以拿到了。

当 readableFlowing 设置为 false 就不会自动读了,需要手动通过 read 来读入。

readableStream.readableFlowing = false;let data;while((data = readableStream.read()) != null) {    console.log(data.toString());}

登录后复制

但自己手动 read 比较麻烦,我们依然可以用自动流入的方式,调用 pause 和 resume 来暂停和恢复就行了。

当调用 writable stream 的 write 方法的时候会返回一个 boolean 值代表是写入了目标还是放在了缓冲区:

true: 数据已经写入目标false:目标不可写入,暂时放在缓冲区

我们可以判断返回 false 的时候就 pause,然后等缓冲区清空了就 resume:

const rs = fs.createReadStream(src);const ws = fs.createWriteStream(dst);rs.on('data', function (chunk) {    if (ws.write(chunk) === false) {        rs.pause();    }});rs.on('end', function () {    ws.end();});ws.on('drain', function () {    rs.resume();});

登录后复制

这样就能达到根据写入速率暂停和恢复读入速率的功能,解决了背压问题。

pipe 有背压问题么?

平时我们经常会用 pipe 来直接把 Readable 流对接到 Writable 流,但是好像也没遇到过背压问题,其实是 pipe 内部已经做了读入速率的动态调节了。

const rs = fs.createReadStream(src);const ws = fs.createWriteStream(dst);rs.pipe(ws);

登录后复制

总结

流是传输数据时常见的思想,就是一部分一部分的传输内容,是文件读写、网络通信的基础概念。

Node.js 也提供了 stream 的 api,包括 Readable 可读流、Writable 可写流、Duplex 双工流、Transform 转换流。它们分别实现 _read、_write、_read + _write、_transform 方法,来做数据的返回和处理。

创建 Readable 对象既可以直接调用 Readable api 创建,然后重写 _read 方法,也可以继承 Readable 实现一个子类,之后实例化。其他流同理。(Readable 可以很容易的和 generator 结合)

当读入的速率大于写入速率的时候就会出现“背压”现象,会爆缓冲区导致数据丢失,解决的方式是根据 write 的速率来动态 pause 和 resume 可读流的速率。pipe 就没有这个问题,因为内部做了处理。

流是掌握 IO 绕不过去的一个概念,而背压问题也是流很常见的问题,遇到了数据丢失可以考虑是否发生了背压。希望这篇文章能够帮大家理清思路,真正掌握 stream!

更多编程相关知识,请访问:编程入门!!

以上就是深入了解Node.js中的4种 stream的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2634032.html

(0)
上一篇 2025年3月7日 04:27:38
下一篇 2025年2月22日 15:06:31

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • Node.js中如何利用node-cron来调度任务?

    没有一个开发人员愿意把所有时间都花在繁琐的任务上,比如系统维护和管理、日常数据库备份以及定期下载文件和电子邮件。你更愿意专注于富有成效的工作,而不是跟踪这些烦人的琐事何时需要完成。【推荐学习:《nodejs 教程》】 这时就需要使用到任务调…

    2025年3月7日
    000
  • 聊聊Node.js Buffer中的encoding(编码)

    本篇文章带大家了解一下node.js buffer中的encoding,希望对大家有所帮助! 计算机最小的单位是一个位,也就是 0 和 1,在硬件上通过高低电平来对应。但是只有一位表示的信息太少了,所以又规定了 8 个位为一个字节,之后数字…

    2025年3月7日
    200
  • nodejs是做什么的

    nodejs是能够在服务器端运行JavaScript的开放源代码、跨平台运行环境;Node.js采用Google开发的V8运行代码,使用事件驱动、非阻塞和异步输入输出模型等技术来提高性能,可优化应用程序的传输量和规模。 本文操作环境:win…

    2025年3月7日
    200
  • 浅析Node.js api的 POSIX 标准

    posix 是什么?有哪些内容?下面本篇文章带大家了解一下node.js api的 posix 标准、node.js api的特点,希望对大家有所帮助! 【推荐学习:《nodejs 教程》】 如果你用过 Node.js 的 api,会不会觉…

    2025年3月7日
    200
  • 聊聊利用Node.js 的多线程能力怎么做异步计算

    怎么做异步计算?下面本篇文章给大家介绍一下利用浏览器和 node.js 的多线程能力做异步计算的方法,希望对大家有所帮助! 都说 Node.js 可以实现高性能的服务器,那什么是高性能呢? 所有的软件代码最终都是通过 CPU 来跑的,能不能…

    2025年3月7日
    200
  • 怎么利用Node.js查看操作系统及其版本号

    怎么利用node.js查看操作系统及其版本号?下面本篇文章给大家介绍一下node.js中判断操作系统、获取windows和mac系统的node.js版本号的方法。 获取操作系统 在 Node.js 中判断操作系统是非常简单的,用 proce…

    2025年3月7日
    200
  • 浅谈利用Node.js如何获取WI-FI密码

    利用node.js如何获取wi-fi密码?下面本篇文章给大家介绍一下使用node.js获取wi-fi密码的方法,希望对大家有所帮助! 【推荐学习:《nodejs 教程》】 演示效果 全局安装wifi-password-cli依赖 npm i…

    2025年3月7日
    200
  • 浅谈使用node.js怎么搭建本地服务器

    使用node.js怎么搭建本地服务器?下面本篇文章就来给大家介绍一下node.js搭建本地服务器的方法,希望对大家有所帮助! node.js是基于JavaScript的一门后端语言,前端小伙伴儿可以很快上手,并自己搭建一个本地的服务器。一起…

    2025年3月7日 编程技术
    200
  • 深入了解Node.js中的异步编程,分享四种解决方案

    有异步i/o,必有异步编程!今天来学习node.js里的异步编程! 异步编程概述 曾经的单线程模型在同步I/O的影响下,由于I/O调用缓慢,在应用层面导致CPU与I/O无法重叠进行。为了照顾编程人员的阅读思维习惯,同步I/O盛行了很多年。【…

    2025年3月7日 编程技术
    200
  • nodejs怎么安装gulp

    安装方法:1、下载并安装Nodejs;2、配置npm的全局路径;3、打开“Node.js command prompt”命令行,执行“npm install –g gulp”命令即可全局安装gulp。 本教程操作环境:windows7系统、…

    2025年3月7日
    200

发表回复

登录后才能评论