Fork me on GitHub

我所了解的 EventEmitter

1. Events 模块介绍

  • node 的特性:单线程 异步 非阻塞 I/O 基于事件驱动的运行在服务器端轻量、高效的脚本语言;
  • Events 模块是 nodejs 的核心模块,是 node 实现事件驱动的基础,node 中几乎所有的模块(如 http, fs 等)都继承该模块;

    1
    2
    3
    4
    const fs = require('fs');
    const EventEmitter = require('events');
    var stream = fs.createReadStream('./a.js');
    console.log(stream instanceof EventEmitter); // true
  • Events 模块实现了事件注册,通知等功能,是观察者模式(事件发布/订阅模式)的实现;

  • 只要继承 EventEmitter 类,就可拥有事件注册、触发事件等,所有能触发事件的对象都是 EventEmitter 类的实例, 例如:
    • net.Server 对象会在每次有新连接时触发事件;
    • fs.ReadStream 会在文件被打开时触发事件;
    • 流对象 会在数据可读时触发事件;

2. 相关 API

  1. 导入模块(创建emitter实例)

    1
    const EE = require('events');   // events 模块只提供了一个EventEmitter 类,所以返回的就是 EventEmitter
  2. 注册绑定事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 相当于增加观察者,addListener 和 on 这两个方式作用是一样的, addListener 是 on 方法的别名
    EE.addListener(event, listener);
    EE.on(event, listener);

    // 绑定的事件监听器只会执行一次,然后就会被删除掉
    EE.once(event, listener);

    // 添加了任意的监听函数时会触发newListener
    EE.on('newListener', (type, listener) => {
    });
  3. 触发事件

    1
    EE.emit(event);
  4. 移除事件

    1
    2
    3
    4
    5
    // 移除事件 event 的监听函数 listener
    EE.removeListener(event, listener);

    // 移除所有
    EE.removeAllListeners([event]);
  5. 其它api

    1
    2
    3
    4
    5
    6
    // 设置同一事件的监听器最大绑定数,默认情况下,超过 10 个就会警告;设置为 0,是无限制;
    EE.setMaxListeners(n);
    // 查看事件监听器数量
    EE.listenerCount();
    .......
    更多查看官网

3. EventEmitter 接口的部署

EventEmitter 接口可以部署在任意对象上,使得这些对象也能订阅和发布消息。

  • 直接继承 EventEmitter类
  • Node 内置模块 util 的 inherits 方法,实现继承
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
eg1: 借助util模块  
let EventEmitter = require('events');
let util = require('util');
function WriteStream () {
EventEmitter.call(this); // 继承私有属性
}
util.inherits(WriteStream, EventEmitter); //原型继承,继承公有
/**
* 定义一个WriteStream类,把EventEmitter类的方法添加到新创建的WriteStream 类中,使WriteStream的实例具有EventEmitter的方法。
* 相当于:
* Object.setPrototypeOf(ctor.prototype, superCtor.prototype); //指定的对象的原型
* ctor.prototype.__proto__ = superCtor.prototype;
*/

eg2: 借助es6 extends方法
const EventEmitter = require('events');
class MyEmitter extends EventEmitter {}
const myEmitter = new MyEmitter();

myEmitter.on('event', () {
console.log('触发了一个事件了');
});
myEmitter.emit('event'); //触发事件

当我们订阅了'event'事件后,可以在任何地方通过emit('event')来执行事件回调;

4. 异常处理

由于监听函数的执行是同步执行的,所以针对同步的代码可以通过try catch捕获到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const EventEmitter = require('events');
class MyEmitter extends EventEmitter{};
const myEmitter = new MyEmitter();
myEmitter.on('event', function() {
a.b();
console.log('listener1');
});
myEmitter.on('event', async function() {
console.log('listener2');
await new Promise((resolve, reject) => {
setTimeout(() => {
resolve(1);
}, 1000);
});
});
myEmitter.on('event', function() {
console.log('listener3');
});
try {
myEmitter.emit('event');
} catch(e) {
console.error('err');
}
console.log('end');
// 输出结果
end
err

但是如果把a.b();移到第二个listener里面的话就会出现下面的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const EventEmitter = require('events');
class MyEmitter extends EventEmitter{};
const myEmitter = new MyEmitter();
myEmitter.on('event', function() {
console.log('listener1');
});
myEmitter.on('event', async function() {
console.log('listener2');
a.b();
await new Promise((resolve, reject) => {
setTimeout(() => {
resolve(1);
}, 1000);
});
});
myEmitter.on('event', function() {
console.log('listener3');
});
try {
myEmitter.emit('event');
} catch(e) {
console.error('err');
}
console.log('end');

// 输出结果
listener1
listener2
listener3
end
(node:9046) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): ReferenceError: a is not defined
(node:9046) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code

async 函数的特点就在于它的返回值是一个 Promise,如果函数体内出现错误的话 Promise 就是 reject 状态。Node.js 不推荐忽略 reject 的 promise,而 EventEmitter 对于各监听函数的返回值是忽略的,所以才会出现上面的情况。明白了问题的原因后我们就可以确定对于上面的情况的话,需要在第二个 listener 里面增加 try catch 的处理。

当事件被触发时,如果没有与该事件绑定的函数的话,该事件会被静默忽略掉,但是如果事件的名称是 error 的话,没有与此相关的事件处理的话,程序就会 crash 退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const EventEmitter = require('events');
class MyEmitter extends EventEmitter{};
const myEmitter = new MyEmitter();
myEmitter.on('event', function(data) {
console.log(data);
});
myEmitter.emit('error');

events.js:199
throw err;
^

Error [ERR_UNHANDLED_ERROR]: Unhandled error.
at MyEmitter.emit (events.js:197:19)
at Object.<anonymous> (/Users/xiji/workspace/learn/event-emitter/b.js:7:11)
at Module._compile (module.js:641:30)
at Object.Module._extensions..js (module.js:652:10)
at Module.load (module.js:560:32)
at tryModuleLoad (module.js:503:12)
at Function.Module._load (module.js:495:3)
at Function.Module.runMain (module.js:682:10)
at startup (bootstrap_node.js:191:16)
at bootstrap_node.js:613:3

只有添加了针对 error 事件的处理函数的话程序才不会退出了。

另外一种方式是 process 监听 uncaughtException 事件,但是这并不是推荐的做法,因为 uncaughtException 事件是非常严重的,通常情况下在 uncaughtException 的处理函数里面一般是做一些上报或者清理工作,然后执行 process.exit(1) 让程序退出了。

1
2
3
4
5
6
7
8
9
10
process.on('uncaughtException', function(err) {  
console.error('uncaught exception:', err.stack || err);
// orderly close server, resources, etc.
closeEverything(function(err) {
if (err)
console.error('Error while closing everything:', err.stack || err);
// exit anyway
process.exit(1);
});
});

5. 监听函数的执行顺序

除了 on 的方式(向后追加),我们还可以使用 prependListene r的方法来(向前插入)增加监听函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const EventEmitter = require('events');
class MyEmitter extends EventEmitter{};
const myEmitter = new MyEmitter();
myEmitter.prependListener('event', function() {
console.log('listener1');
});
myEmitter.prependListener('event', async function() {
console.log('listener2');
});
myEmitter.prependListener('event', function() {
console.log('listener3');
});
myEmitter.emit('event');
console.log('end');
// 输出结果
listener3
listener2
listener1
end

6. this 指向

监听函数如果采用如下写法的话,那么 this 的指向就是事件的 emitter

1
2
3
4
5
6
7
const EventEmitter = require('events');
class MyEmitter extends EventEmitter{};
const myEmitter = new MyEmitter();
myEmitter.on('event', function(a, b) {
console.log(a, b, this === myEmitter); // a b true
});
myEmitter.emit('event', 'a', 'b');

如果是用箭头函数写法的话,那么 this 就不是指向 emitter 了

1
2
3
4
5
6
7
const EventEmitter = require('events');
class MyEmitter extends EventEmitter{};
const myEmitter = new MyEmitter();
myEmitter.on('event', (a, b) => {
console.log(a, b, this === myEmitter); // a b false
});
myEmitter.emit('event', 'a', 'b');

7. 简单实现一个自己的 myEventEmitter 库

首先实现这个 myEventEmitter 库,需要先对观察者模式进行了解,这样写起来才会得心应手;

观察者模式是管理消息分发的一种方式,这种模式中,发布消息的一方不需要知道这个消息会给谁,而订阅一方也无需知道消息的来源。

简单原理描述:this.events 维护一个信号对应函数的列表,通过这个索引,可以对这个信号进行增、删等操作,你只需要按照这个信号执行对象的回调函数;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class MyEvents {
constructor () {
this.events = {} //存储事件监听函数
this.maxListeners = 10 //一种函数类型,最大监听函数数量
}

setMaxListeners (maxNum) {
this.maxListeners = maxNum
}

getMaxListeners () {
return this.maxListeners
}

listeners (event) {
return this.events[event]
}

addListener (type, listener) {
if (this.events[type]) {
if (this.maxListeners != 0 && this.events[type].length > this.maxListeners) {
return console.error(`该${type}事件类型的listteners超出限制,使用emitter.setMaxListeners() 来增加添加事件监听数量。`)
}
this.events[type].push(listener)
} else {
this.events[type] = [listener]
}
}

once (type, listener) {
//执行后立即销毁
let wrapper = (...rest) => {
listener.apply(this, rest)
this.removeListener(type, wrapper)
}
this.addListener(type, wrapper)
}

removeListener (type, listener) {
if (this.events[type]) {
this.events[type] = this.events[type].filter(ev => {
ev != listener
})
//抛弃掉等于listener的
}
}

removeAllListener (type) {
delete this.events[type]
}

emit (type, ...rest) {
this.events[type] && this.events[type].forEach(listener => {
listener.apply(this, rest)
})
}
}

MyEvents.prototype.on = MyEvents.prototype.addListener
module.exports = MyEvents

学习链接:

-------------本文结束感谢您的阅读-------------