RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。更简洁地实现异步和事件驱动应用。
ReactiveX结合了Observer模式、Iterator模式和函数式编程和集合来构建一个管理事件序列的理想方式。
最新版本 7.0.0-beta.12
可组合
而Observables是专门设计用来组合异步数据流的。
弹性强(灵活)
支持单标量值,而且支持数据序列甚至是无限的数据流。
具体实现没有限制
没有对任何并发和异步库的依赖,可以用线程池、事件循环、非阻塞I/O、actors等任何技术来实现,用任何适应的风格。Observable应该永远被认为是异步的,无论底层实现是阻塞还是非阻塞的。可以随时修改实现而不影响使用。
Callbacks的问题
对于单层异步任务,简单易用。而对于需要多层嵌套的场景就难于处理了。
在RxJS中管理异步事件的基本概念如下:
需求:从0开始按顺序每秒输出一个整数
常规解决方法:
function CountCase1() {
let count = 0;
let timer = setInterval(() => {
count += 1;
console.log(`Count case1: ${count}`);
}, 1000);
}
CountCase1();
Rxjs解决方案:
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
function CountCase2() {
return interval(1000).pipe(take(1000));
}
CountCase2().subscribe((num) => console.log('Count case2: ', num));
import { Observable } from 'rxjs';
function Create() {
return Observable.create((observer: any) => {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
}
Create().subscribe({
next: (x: any) => console.log(`got value${x}`),
error: (err: any) => console.error(`somthing wrong occurred: ${err}`),
complete: () => console.log('done')
});
console.log('just after subscribe');
结论:
Subscription代表了一个一次性的资源,通常表示的是一个Observable execution。一个Subscription有一个重要的方法,unsubscribe,它不需要参数,仅仅是处理subscription的资源。在之前的RxJS版本中,Subscription被称作"Disposable"。
一个Subscription实质上是一个unsubscribe()函数,用来释放资源或者取消一个Observable executions。
import { Observable } from 'rxjs';
function Create() {
return Observable.create((observer: any) => {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
}
const subscription = Create().subscribe({
next: (x: any) => console.log(`got value${x}`),
error: (err: any) => console.error(`somthing wrong occurred: ${err}`),
complete: () => console.log('done')
});
subscription.unsubscribe();
console.log('just after subscribe');
一个RxJS Subject是一个特殊类型的Observable,它允许值可以多路广播给多个Observers。普通的Observables是单路广播(每个subscribed Observer拥有自己独立的Observable execution),Subjects是多路广播。
import { Subject } from 'rxjs';
function Create() {
return new Subject();
}
const subject = Create();
subject.subscribe({
next: (x: any) => console.log(`got value in case 1: ${x}`),
error: (err: any) => console.error(`somthing wrong occurred in case 1: ${err}`),
complete: () => console.log('in case 1 done')
})
subject.subscribe({
next: (x: any) => console.log(`got value in case 2: ${x}`),
error: (err: any) => console.error(`somthing wrong occurred in case 2: ${err}`),
complete: () => console.log('in case 2 done')
})
subject.next(1);
subject.next(2);
Opeartors是Obsrevable的方法,就像map()、filter()、merge()等。当它被调用时,它们并不改变已经存在的Observable,而是返回一个基于第一个Observable上新的Observable。
import { of, forkJoin, Observable, throwError, race } from 'rxjs';
function Create1(): Observable<string> {
return of('Operater Demo 1');
}
function Create2(): Observable<string> {
return of('Operater Demo 2');
}
function Create3(): Observable<string> {
return Observable.create((observer: any) => {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
}
返回一个字符串
const subject = Create1();
subject.subscribe({
next: (x: any) => console.log(`got string: ${x}`),
error: (err: any) => console.error(`somthing wrong occurred: ${err}`),
complete: () => console.log('done')
})
对比Promise
const subject1 = Create1();
const subject2 = Create2();
const subject3 = Create3();
console.log('start');
forkJoin([subject1, subject2]).subscribe(
(data: Array<string>) => {
data.map(item => console.log(item));
});
console.log('done');
console.log('start');
race([subject1, subject2]).subscribe(
(data: string) => {
console.log(data);
});
console.log('done');
console.log('start');
forkJoin([subject1, subject3]).subscribe({
next: (data: Array<string>) => {
data.map(item => console.log(item));
}});
console.log('done');
import { Observable, of, throwError } from 'rxjs';
import { catchError, map, mergeAll } from "rxjs/operators";
function Create(): Observable<any> {
return of([
1,
2,
() => throwError('exception.'),
3,
]).pipe(
map(val => val),
mergeAll(),
catchError(() => of('error catched.'))
);
}
var observable = Create()
var finalObserver = {
next: (x: any) => console.log('got value ' + x),
error: (err: any) => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');
当一个subscription开始工作或者notifications被传递,scheduler就会开始调用。
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from "rxjs/operators";
function Create(): Observable<any> {
return Observable.create((proxyObserver: any) => {
proxyObserver.next(1);
proxyObserver.next(2);
proxyObserver.next(3);
proxyObserver.complete();
}).pipe(
observeOn(asyncScheduler)
);
}
var observable = Create()
var finalObserver = {
next: (x: any) => console.log('got value ' + x),
error: (err: any) => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');
[1]
[2]
[3]
[4]
[5]
[6]
因篇幅问题不能全部显示,请点此查看更多更全内容