Event Emitter

使用 Node.js 監控資料夾變更,並以 Event Emitter 封裝 File Watcher

Node.js 監控資料夾

在 Node.js 中,我們可以透過 fs.watch 來監控資料夾是否有異動,不需要寫太多程式碼。以下是一個基本範例:

1
2
3
4
5
6
7
8
9
10
11
const fs = require('fs');
const path = require('path');
const directoryToWatch = './watched';

fs.watch(directoryToWatch, (eventType, filename) => {
if (filename) {
console.log(`File ${filename} has changed with event type: ${eventType}`);
} else {
console.log('filename not provided');
}
});

這段程式會持續監控 ./watched 資料夾的內容變化,當有新增、修改或刪除檔案時,就會觸發 callback 函式。

根據副檔名處理不同邏輯

如果我們想要針對不同的副檔名做不同處理,可以進一步使用 path.extname() 判斷檔案類型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const fs = require('fs');
const path = require('path');
const directoryToWatch = './watched';

fs.watch(directoryToWatch, (eventType, filename) => {
if (filename) {
const fileExtension = path.extname(filename);
switch (fileExtension) {
case '.js':
// ...
break;
case '.txt':
// ...
break;
case '.json':
// ...
break;
default:
console.log(`Ignoring file ${filename} with extension ${fileExtension}`);
}
} else {
console.log('filename not provided');
}
});

不過隨著專案變大,副檔名的判斷邏輯(if-else 或 switch)會越來越複雜、難以維護。這時就可以考慮封裝成 Event Driven 的架構,提升可讀性與擴充性。

將邏輯封裝成 Pub/Sub 模型的 Watcher

Node.js 提供的 EventEmitter 非常適合實作發布/訂閱(Publish/Subscribe)模型。EventEmitter 詳細介紹可以參考這篇文章:

使用 Node.js 的 EventEmitter 建立事件監聽機制:概念 & 實作

fs.watch 偵測到檔案更動,我們使用 .emit(extension) 來「發布」這個事件,而訂閱該副檔名的使用者透過 .on(extension, callback) 就能被「通知」。這讓我們可以針對特定副檔名建立模組化的監聽邏輯,使架構更清晰、易於擴充。

我們利用 class 繼承 EventEmitter,在 constructor 參數傳入資料夾,並呼叫 start() 時開始監控:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// watcher.js
const fs = require('fs');
const path = require('path');
const events = require('events');

class Watcher extends events.EventEmitter {
constructor(watchDir) {
super();
this.watchDir = watchDir;
}

start() {
fs.watch(this.watchDir, (eventType, filename) => {
if (!filename) {
console.warn('Warning: filename not provided');
return;
}
const fileExtension = path.extname(filename);
this.emit(fileExtension, eventType, filename);
})
}
}

module.exports = Watcher;

這段程式碼會根據檔案副檔名觸發對應的事件,達成根據檔案類型處理相對應的 callback。我們建立一個 watcher 並註冊感興趣的副檔名,並呼叫 start() 開始監控的資料夾:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// index.js
const Watcher = require('./watcher');

const directoryToWatch = './watched';
const watcher = new Watcher(directoryToWatch);

watcher.on('.js', (eventType, filename) => {
// ...
});

watcher.on('.txt', (eventType, filename) => {
// ...
});

watcher.on('.json', (eventType, filename) => {
// ...
});

watcher.start();

這樣的寫法使得每個副檔名的處理邏輯可以被模組化,也可以讓多個檔案處理者獨立訂閱自己關心的事件。

Conclusion

透過封裝 fs.watch 並結合 EventEmitter 的事件機制,我們實作了一個簡單且可擴充的資料夾監控系統。這個模式讓你可以專注在各類檔案的邏輯處理,而不需要在程式碼中寫太複雜的 if-else 部分。這樣的架構能讓你在不干擾其他邏輯的情況下,自由地新增更多檔案類型處理器,提升模組化與維護性。

Reference

使用 Node.js 的 EventEmitter 建立事件監聽機制:概念 & 實作

概念

在 Node.js 中,建立 EventListener 可以使用 EventEmitter,概念與 publish/subscribe 非常相似,如果之前曾經使用過 Redis 的 Pub/Sub 機制,會更容易理解。

假設有一個人先訂閱(subscribe)了一個名為 ‘xxx’ 的頻道(channel),之後當有人發布(publish)訊息到 ‘xxx’ 頻道時,訂閱者就會看到發布者發出的訊息。

Event Listener

回到 Node.js 的 Event Listener 機制,我們可以透過 EventEmitter 建立一個 publish/subscribe 物件。執行物件的 .on 方法就類似於訂閱(subscribe),而 .emit 方法則類似於發布(publish)。以下是範例程式碼:

1
2
3
4
5
6
7
8
9
10
const EventEmitter = require('events');
const channel = new EventEmitter();

// 訂閱 'join' 事件:任何人加入時會通知
channel.on('join', (message) => {
console.log(`Received message: ${message}`);
});

// 發布 'join' 事件
channel.emit('join', 'Hello, everyone!');

在這個範例中,我們使用 EventEmitter 建立了一個 channel 並且監聽 ‘join’ 事件。當有使用者加入時,會透過 .emit(‘join’) 通知 ‘join’ 事件。我們可以建立一個 HTTP server 並稍微修改一下來驗證我們的想法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// server.js
const net = require('net');

const EventEmitter = require('events').EventEmitter;
const channel = new EventEmitter;

channel.clients = {};

// 訂閱 'join' 事件:任何人加入時會通知
channel.on('join', (id, client) => {
channel.clients[id] = client;
console.log(`${id} join`);
});

const server = net.createServer(client => {
// 使用 `IP:Port` 當作 ID
const id = `${client.remoteAddress}:${client.remotePort}`;

// 通知 'join' 事件
channel.emit('join', id, client);
})

server.listen(30000);

.emit 不僅用來觸發事件,還可以同時傳入多個參,當你呼叫 .emit('join', arg1, arg2, ...) 時,所有註冊了這個事件的監聽器 .on('join', (arg1, arg2) => {...}) 都會依序被呼叫,且能接收到這些參數。

net.createServer 是使用 Node.js 的 net module 來建立一個 TCP server。當有 client 進來時會執行 callback function,這個 callback 第一個參數代表接收到一個 net.Socket 的物件。

在 socket 的 callback 中,可以透過 client.remoteAddressclient.remotePort 來獲取連線 client 的 IP 與 port,組合成唯一個 id。然後使用 .emit('join', id, client) 觸發一個 join 事件,同時把這個 idclient 傳入給 join 事件的 callback,最後儲存在 channel.clients 中。

我們來測試一下,先開一個 server,然後使用 telnet localhost 30000 來連到 server,在 server 上可以看到 xxx join 的訊息。

1
2
3
4
5
6
7
8
9
# 先開一個 server
$ node server.js
::ffff:127.0.0.1:37282 join

# 然後加入使用者
$ telnet localhost 30000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.

Error Event

在前面的例子,這個 join 事件名稱是可以自定義的,可以是任何字串,所以如果觸發一個不存在的事件 .emit('notExist'),其實是不會發生什麼事情的,但是有一個例外:error 事件。如果沒有人註冊 error 事件,但有人在 error 事件中發出訊息,程式會直接停止運行並印出錯誤訊息。

1
2
3
4
5
6
const events = require('events');
const myEmitter = new events.EventEmitter();
myEmitter.on('error', err => {
console.log(`ERROR: ${err.message}`);
});
myEmitter.emit('error', new Error('Something is wrong.'));

如果在執行 error callback 時又發生錯誤,則會丟出 uncaughtException 錯誤,可以透過 process.on('uncaughtException') 來捕捉這個錯誤。

1
2
3
process.on('uncaughtException', err => {
console.error('There was an uncaught error', err);
});

上述的例子可以增加這兩個事件:

1
2
3
4
5
6
7
process.on('uncaughtException', err => {
console.log("uncaughtException", err.stack);
});

channel.on('error', err => {
console.log("channel error", err.message)
});

廣播給所有使用者

現在 server 建立好了,我們想要讓多個使用者加入,如果其中有一個 client 發話,其他 client 都可以收到訊息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const net = require('net');

const EventEmitter = require('events').EventEmitter;
const channel = new EventEmitter;

channel.clients = {};

channel.on('join', (id, client) => {
channel.clients[id] = client;
channel.on('broadcast', (senderId, message) => {
if (id != senderId) {
channel.clients[id].write(message);
}
});
console.log(`${id} join`);
channel.emit('broadcast', id, `${id} join\n`); // 通知有人加入群組
});

const server = net.createServer(client => {
const id = `${client.remoteAddress}:${client.remotePort}`;
channel.emit('join', id, client);
client.on('data', data => {
data = data.toString();
channel.emit('broadcast', id, data);
});
})

server.listen(30000);

join 事件中,當有 client 加入時,會針對每個 client 註冊 broadcast 事件。當有人發送廣播訊息時 channel.emit('broadcast', ...),就會觸發 callback 事件。這個 callback 需要接收兩個參數:

  • senderId:發送者的 id。
  • message:廣播的訊息。

如果該 client 不是訊息的發送者,則會使用 channel.clients[id].write(message) 將訊息送給這個 client。

最後每當 client 連線到 server 並傳送資料時,在 data 事件中 client.on('data', callback),這個 callback 就會被觸發。這個 callback 會透過我們前面新增的 broadcast 事件來廣播給所有人。

.on('data', callback) 收到的資料通常是 Buffer,所以會先轉成 string。

我們來測試一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# server
$ node server.js
::ffff:127.0.0.1:40542 join
::ffff:127.0.0.1:40556 join
::ffff:127.0.0.1:40572 join

# user1 再另外開三個 Terminal 分別建立連線
$ telnet localhost 30000
::ffff:127.0.0.1:40556 join
::ffff:127.0.0.1:40572 join
akiicat

# user2
$ telnet localhost 30000
::ffff:127.0.0.1:40572 join
akiicat

# user3
$ telnet localhost 30000
akiicat

先開一個 server,然後分別加入使用者並發話,記得最後要按 enter 才會發送,就可以看到發送的訊息會廣播到其他連線的 client。

如果你持續增加使用者,超過 10 個人訂閱同一個事件時,Node.js 會印出警告你,但可以透過 .setMaxListeners(50) 來增加上限。

1
eventEmitter.setMaxListeners(50);

離開群組 removeListener

現在我們的使用者可以加入群組,並且可以廣播訊息給所有人,看起來很不錯,但是使用者加入後卻沒有辦法離開。我們希望使用者輸入 leave 的時候,會將自己從群組中移除,並且斷開連線。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
channel.on('leave', (id) => {
channel.emit('broadcast', id, `${id} has left\n`); // 通知有人離開
channel.clients[id].destroy(); // 斷開連線
delete channel.clients[id];
});

const server = net.createServer(client => {
const id = `${client.remoteAddress}:${client.remotePort}`;
channel.emit('join', id, client);
client.on('data', data => { data = data.toString();
if (data.trim() === 'leave') { // 如果使用者輸入 leave 則觸發 leave 事件
channel.emit('leave', id);
} else {
channel.emit('broadcast', id, data);
}
});
})

看起來功能已經完成了,但是有一個小問題。在 join 事件中,使用者訂閱的 broadcast 事件沒有清除。如果有人離開之後,就會在後續的廣播嘗試寫入不存在的連線,這會造成 Node.js 的錯誤。

因此我們在 leave 時需要用 .removeListener 來取消訂閱,注意 removeListener 帶入的 callback 必須跟 on 的 callback 一樣,

1
2
3
4
// removeListener 帶入的 callback 需要跟 on 的 callback 一樣
const callback = () => {...}
.on('broadcast', callback)
.removeListener('broadcast', callback)

所以我們需要修改一下 server.js

  • 新增 channel.subscriptions,並在 join 事件中,每當有使用者加入時儲存每個 client 的 callback。
  • leave 事件中 removeListener 將對應使用者的 broadcast callback 移除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
const net = require('net');

const EventEmitter = require('events').EventEmitter;
const channel = new EventEmitter;

channel.clients = {};
channel.subscriptions = {}; // 新增 subscriptions 物件用來儲存 client 的 broadcast callback

channel.on('join', (id, client) => {
channel.clients[id] = client;
channel.subscriptions[id] = (senderId, message) => { // 儲存 client 的 broadcast callback
if (id != senderId) {
channel.clients[id].write(message);
}
};
channel.on('broadcast', channel.subscriptions[id]); // client 訂閱 broadcast 事件
channel.emit('broadcast', id, `${id} join\n`); // client 訂閱 broadcast 事件
});

channel.on('leave', (id) => {
channel.removeListener('broadcast', channel.subscriptions[id]); // client 取消訂閱 broadcast 事件
channel.emit('broadcast', id, `${id} has left\n`);
channel.clients[id].destroy();
delete channel.clients[id];
delete channel.subscriptions[id]; // 從物件中移除
});

const server = net.createServer(client => {
const id = `${client.remoteAddress}:${client.remotePort}`;
channel.emit('join', id, client);
client.on('data', data => { data = data.toString();
if (data.trim() === 'leave') {
channel.emit('leave', id);
} else {
channel.emit('broadcast', id, data);
}
});
})

server.listen(30000);

在 Terminal 測試使用者能否正常離開群組:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ node server.js

# user1 使用者離開
$ telnet localhost 30000
::ffff:127.0.0.1:54538 join
::ffff:127.0.0.1:54554 join
Hello
leave
Connection closed by foreign host.

# user2 使用者離開後照樣可正常廣播
$ telnet localhost 30000
::ffff:127.0.0.1:54554 join
Hello
::ffff:127.0.0.1:54528 has left
Hi

# user3
$ telnet localhost 30000
Hello
::ffff:127.0.0.1:54528 has left
Hi

關閉伺服器 removeAllListeners

現在我們新增一個新功能,輸入 shutdown 指令讓所有人取消訂閱,並斷開所有連線。在前一章節所學的 removeListener,我們可以使用迴圈將每個使用者從 broadcast 事件移除。然而,我們可以更間單的使用 .removeAllListeners('broadcast') ,來一次性移除所有訂閱 broadcast 事件上的使用者。最後我們的完整程式碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
const net = require('net');

const EventEmitter = require('events').EventEmitter;
const channel = new EventEmitter;

channel.clients = {};
channel.subscriptions = {}; // 新增 subscriptions 物件用來儲存 client 的 broadcast callback

channel.on('join', (id, client) => {
channel.clients[id] = client;
channel.subscriptions[id] = (senderId, message) => { // 儲存 client 的 broadcast callback
if (id != senderId) {
channel.clients[id].write(message);
}
};
channel.on('broadcast', channel.subscriptions[id]); // client 訂閱 broadcast 事件
channel.emit('broadcast', id, `${id} join\n`); // client 訂閱 broadcast 事件
});

channel.on('leave', (id) => {
channel.removeListener('broadcast', channel.subscriptions[id]); // client 取消訂閱 broadcast 事件
channel.emit('broadcast', id, `${id} has left\n`);
channel.clients[id].destroy();
delete channel.clients[id];
delete channel.subscriptions[id]; // 從物件中移除
});

channel.on('shutdown', () => {
channel.emit('broadcast', '', 'Server shutdown');
channel.removeAllListeners('broadcast');
Object.keys(channel.clients).forEach(function(id) { channel.clients[id].destroy();
delete channel.clients[id];
delete channel.subscriptions[id];
})
});

const server = net.createServer(client => {
const id = `${client.remoteAddress}:${client.remotePort}`;
channel.emit('join', id, client);
client.on('data', data => { data = data.toString();
if (data.trim() === 'shutdown') {
channel.emit('shutdown', id);
} else if (data.trim() === 'leave') {
channel.emit('leave', id);
} else {
channel.emit('broadcast', id, data);
}
});
})

server.listen(30000);

測試一下,輸入 shutdown 後所有人將會斷開連線:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# server
$ node server.js

# user1
$ telnet localhost 30000
::ffff:127.0.0.1:51130 join
::ffff:127.0.0.1:51136 join
shutdown
Server shutdownConnection closed by foreign host.

# user2
$ telnet localhost 30000
::ffff:127.0.0.1:51136 join
Server shutdownConnection closed by foreign host.

# user3
$ telnet localhost 30000
Server shutdownConnection closed by foreign host.

只要 Event Loop 中還有待處理的非同步操作(例如未完成的 I/O、定時器、網絡請求等),Node.js process 就不會自動退出。這對於需要長期運行、持續監聽請求的 Web 伺服器來說是理想的行為;而對於命令行工具或一次性任務,則可能需要在所有非同步操作完成後主動退出。

Reference