80 行程式碼實現簡易 RxJS
RxJS 是一個響應式的庫,它接收從事件源發出的一個個事件,經過處理管道的層層處理之後,傳入最終的接收者,這個處理管道是由操作符組成的,開發者只需要選擇和組合操作符就能完成各種非同步邏輯,極大簡化了非同步程式設計。除此以外,RxJS 的設計還遵循了函式式、流的理念。
直接講概念比較難理解,不如我們實現一個簡易的 RxJS 再來看這些。
RxJS 的使用
RxJS 會對事件源做一層封裝,叫做 Observable,由它發出一個個事件。
比如這樣:
const source = new Observable((observer) => {
let i = 0;
setInterval(() => {
observer.next(++i);
}, 1000);
});
複製程式碼
在回撥函式裡面設定一個定時器,不斷通過 next 傳入事件。
這些事件會被接受者監聽,叫做 Observer。
const subscription = source.subscribe({
next: (v) => console.log(v),
error: (err) => console.error(err),
complete: () => console.log('complete'),
});
複製程式碼
observer 可以接收 next 傳過來的事件,傳輸過程中可能有 error,也可以在這裡處理 error,還可以處理傳輸完成的事件。
這樣的一個監聽或者說訂閱,叫做 Subscription。
可以訂閱當然也可以取消訂閱:
subscription.unsubscribe();
複製程式碼
取消訂閱時的回撥函式是在 Observable 裡返回的:
const source = new Observable((observer) => {
let i = 0;
const timer = setInterval(() => {
observer.next(++i);
}, 1000);
return function unsubscribe() {
clearInterval(timer);
};
});
複製程式碼
傳送事件、監聽事件只是基礎,處理事件的過程才是 RxJS 的精髓,它設計了管道的概念,可以用操作符 operator 來組裝這個管道:
source.pipe(
map((i) => ++i),
map((i) => i * 10)
).subscribe(() => {
//...
})
複製程式碼
事件經過管道之後才會傳到 Observer,在傳輸過程中會經過一個個操作符的處理。
比如這裡的處理邏輯是,對傳過來的資料加 1,然後再乘以 10。
綜上,使用 RxJS 的程式碼就是這樣的:
const source = new Observable((observer) => {
let i = 0;
const timer = setInterval(() => {
observer.next(++i);
}, 1000);
return function unsubscribe() {
clearInterval(timer);
};
});
const subscription = source.pipe(
map((i) => ++i),
map((i) => i * 10)
).subscribe({
next: (v) => console.log(v),
error: (err) => console.error(err),
complete: () => console.log('complete'),
});
setTimeout(() => {
subscription.unsubscribe();
}, 4500);
複製程式碼
我們通過 Observable 建立了一個事件源,每秒發出一個事件,這些事件會經過管道的處理再傳遞給 Observer,管道的組成是兩個 map 操作符,對資料做了 + 1 和 * 10 的處理。
Observer 接收到傳遞過來的資料,做了列印,還對錯誤和結束時的事件做了處理。此外,Observable 提供了取消訂閱時的處理邏輯,當我們在 4.5s 取消訂閱時,就可以清除定時器。
使用 RxJS 基本就是這個流程,那它是怎麼實現的呢?
80 行程式碼實現 RxJS
先從事件源開始,實現 Observable:
觀察下它的特點:
- 它接收一個回撥函式,裡面可以呼叫 next 來傳輸資料。
- 它有 subscribe 方法可以用來新增 Observer 的訂閱,返回 subscription
- 它可以在回撥函式裡返回 unsbscribe 時的處理邏輯
- 它有 pipe 方法可以傳入操作符
我們按照這些特點來實現下:
首先,Observable 的建構函式要接收回調函式 _subscribe,但是不是立刻呼叫,而是在 subscribe 的時候才呼叫:
class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe() {
this._subscribe();
}
}
複製程式碼
回撥函式的引數是有 next、error、complete 方法的物件,用於傳遞事件:
class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(observer) {
const subscriber = new Subscriber(observer);
this._subscribe(subscriber);
}
}
class Subscriber{
constructor(observer) {
super();
this.observer = observer;
this.isStopped = false;
}
next(value) {
if (this.observer.next && !this.isStopped) {
this.observer.next(value);
}
}
error(value) {
this.isStopped = true;
if (this.observer.error) {
this.observer.error(value);
}
}
complete() {
this.isStopped = true;
if (this.observer.complete) {
this.observer.complete();
}
if (this.unsubscribe) {
this.unsubscribe();
}
}
}
複製程式碼
這樣,在回撥函式裡面就可以呼叫 next、error、complete 方法了:
此外,回撥函式的返回值是 unsbscribe 時的處理邏輯,要收集起來,在取消訂閱時呼叫:
class Subscription {
constructor() {
this._teardowns = [];
}
unsubscribe() {
this._teardowns.forEach((teardown) => {
typeof teardown === 'function' ? teardown() : teardown.unsubscribe()
});
}
add(teardown) {
if (teardown) {
this._teardowns.push(teardown);
}
}
}
複製程式碼
提供 unsubscribe 方法用於取消訂閱,_teardowns 用於收集所有的取消訂閱時的回撥,在 unsubscribe 時呼叫所有 teardown 回撥。
這段邏輯比較通用,可以作為 Subscriber 的父類。
然後,在 Observable 裡呼叫 add 來新增 teardown,並且返回 subscription(它有 unsubscribe 方法):
class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(observer) {
const subscriber = new Subscriber(observer);
subscriber.add(this._subscribe(subscriber));
return subscriber;
}
}
class Subscriber extends Subscription {
constructor(observer) {
super();
this.observer = observer;
this.isStopped = false;
}
next(value) {
if (this.observer.next && !this.isStopped) {
this.observer.next(value);
}
}
error(value) {
this.isStopped = true;
if (this.observer.error) {
this.observer.error(value);
}
}
complete() {
this.isStopped = true;
if (this.observer.complete) {
this.observer.complete();
}
if (this.unsubscribe) {
this.unsubscribe();
}
}
}
class Subscription {
constructor() {
this._teardowns = [];
}
unsubscribe() {
this._teardowns.forEach((teardown) => {
typeof teardown === 'function' ? teardown() : teardown.unsubscribe()
});
}
add(teardown) {
if (teardown) {
this._teardowns.push(teardown);
}
}
}
複製程式碼
這樣,我們就實現了 Observable 和 Observer,只寫了 50 行程式碼。先來測試下:
const source = new Observable((observer) => {
let i = 0;
const timer = setInterval(() => {
observer.next(++i);
}, 1000);
return function unsubscribe() {
clearInterval(timer);
};
});
const subscription = source.subscribe({
next: (v) => console.log(v),
error: (err) => console.error(err),
complete: () => console.log('complete'),
});
setTimeout(() => {
subscription.unsubscribe();
}, 4500);
複製程式碼
Observer 監聽到了 Observable 傳遞過來的 1、2、3、4 的資料,因為在 4.5s 時取消了訂閱,所以後面就不再有資料了。
我們用 50 行實現了基礎的 RxJS!
當然,最精髓的 operator 還沒有實現,接下來繼續完善。
我們給 Observable 新增 pipe 方法,它會呼叫傳入的 operator,並且上個的結果是下個的輸入,這樣就串起來了,也就是管道的概念:
class Observable {
constructor(_subscribe) {
//...
}
subscribe(observer) {
//...
}
pipe(...operations) {
return pipeFromArray(operations)(this);
}
}
function pipeFromArray(fns) {
if (fns.length === 0) {
return (x) => x;
}
if (fns.length === 1) {
return fns[0];
}
return (input) => {
return fns.reduce((prev, fn) => fn(prev), input);
};
}
複製程式碼
當傳入的引數是 0 個的時候,就直接返回之前的 Observable,1 個的時候直接返回,否則就通過 reduce 的方式串聯起來,組成管道。
operator 的實現就是監聽上一個 Observable,返回一個新的。
比如 map 的實現,就是傳入 project 對 value 做處理,把結果用 next 傳下去:
function map(project) {
return (observable) => new Observable((subscriber) => {
const subcription = observable.subscribe({
next(value) {
return subscriber.next(project(value));
},
error(err) {
subscriber.error(err);
},
complete() {
subscriber.complete();
},
});
return subcription;
});
}
複製程式碼
這樣我們就實現了 operator,來測試下:
我們呼叫了 pipe 方法,使用兩個 map 操作符來組織處理流程,對資料做了 +1 和 *10 的處理。
所以,Observable 傳遞過來的 1、2、3、4 傳遞給 Observer 的時候就變成了 20、30、40、50。
至此,我們實現了 RxJS 的 Observable、Observer、Subscription、operator 等概念,是一個簡易版 RxJS 了。只用了 80 行程式碼。
再來看最開始的那些理念:
為什麼叫做響應式呢?
因為是對事件源做監聽和一系列處理的,這種程式設計模式就叫做響應式。
為什麼叫函式式呢?
因為每一步 operator 都是純函式,返回一個新的 Observable,這符合函式式的不可變,修改後返回一個新的的理念。
為什麼叫流呢?
因為一個個事件是動態產生和傳遞的,這種資料的動態產生和傳遞就可以叫做流。
完整程式碼如下:
function pipeFromArray(fns) {
if (fns.length === 0) {
return (x) => x;
}
if (fns.length === 1) {
return fns[0];
}
return (input) => {
return fns.reduce((prev, fn) => fn(prev), input);
};
}
class Subscription {
constructor() {
this._teardowns = [];
}
unsubscribe() {
this._teardowns.forEach((teardown) => {
typeof teardown === 'function' ? teardown() : teardown.unsubscribe()
});
}
add(teardown) {
if (teardown) {
this._teardowns.push(teardown);
}
}
}
class Subscriber extends Subscription {
constructor(observer) {
super();
this.observer = observer;
this.isStopped = false;
}
next(value) {
if (this.observer.next && !this.isStopped) {
this.observer.next(value);
}
}
error(value) {
this.isStopped = true;
if (this.observer.error) {
this.observer.error(value);
}
}
complete() {
this.isStopped = true;
if (this.observer.complete) {
this.observer.complete();
}
if (this.unsubscribe) {
this.unsubscribe();
}
}
}
class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(observer) {
const subscriber = new Subscriber(observer);
subscriber.add(this._subscribe(subscriber));
return subscriber;
}
pipe(...operations) {
return pipeFromArray(operations)(this);
}
}
function map(project) {
return (observable) => new Observable((subscriber) => {
const subcription = observable.subscribe({
next(value) {
return subscriber.next(project(value));
},
error(err) {
subscriber.error(err);
},
complete() {
subscriber.complete();
},
});
return subcription;
});
}
const source = new Observable((observer) => {
let i = 0;
const timer = setInterval(() => {
observer.next(++i);
}, 1000);
return function unsubscribe() {
clearInterval(timer);
};
});
const subscription = source.pipe(
map((i) => ++i),
map((i) => i * 10)
).subscribe({
next: (v) => console.log(v),
error: (err) => console.error(err),
complete: () => console.log('complete'),
});
setTimeout(() => {
subscription.unsubscribe();
}, 4500);
複製程式碼
總結
為了理解 RxJS 的響應式、函式式、流等理念,我們實現了簡易版的 RxJS。
我們實現了 Observable、Observer、Subscription 等概念,完成了事件的產生和訂閱以及取消訂閱。
接著又實現了 operator 和 pipe,每個 operator 返回一個新的 Observable,對資料做層層處理。
寫完以後,我們能更清晰的理解響應式、函式式、流等理念在 RxJS 裡是怎麼體現的。
實現簡易版 RxJS,只需要 80 行程式碼。
最後
如果你覺得此文對你有一丁點幫助,點個贊。或者可以加入我的開發交流群:1025263163相互學習,我們會有專業的技術答疑解惑
如果你覺得這篇文章對你有點用的話,麻煩請給我們的開源專案點點star:http://github.crmeb.net/u/defu不勝感激 !
PHP學習手冊:https://doc.crmeb.com
技術交流論壇:https://q.crmeb.com
- 遵循Promises/A 規範,深入分析Promise實現細節 | 通過872測試樣例
- 80 行程式碼實現簡易 RxJS
- 前後端分離專案,如何解決跨域問題?
- springboot中攔截並替換token來簡化身份驗證
- 15 行程式碼在 wangEditor v5 使用數學公式
- Java執行緒池必知必會
- EdgeDB 架構簡析
- TS 型別體操:圖解一個複雜高階型別
- 基於babel的埋點工具簡單實現及思考
- 使用craco對cra專案進行構建優化
- Netty核心概念之ChannelHandler&Pipeline&ChannelHandlerContext
- 理解python非同步程式設計與簡單實現asyncio
- Mycat 作為代理服務端的小知識點
- 一文吃透 React Expiration Time
- 前端模組化詳解
- Java必備主流技術流程圖
- 【建議使用】告別if,Java超好用引數校驗工具類
- MySQL模糊查詢再也不用like %了
- Java 8 的Stream流那麼強大,你知道它的原理嗎
- Vue SEO的四種方案