##RX.js

##一、函数式编程-通用函数式

1.ForEach

1
2
var arr = ['111','222'];
arr.forEacj(item => console.log(item));

2.map

1
2
3
4
5
6
7
Array.prototype.map = function(callback) {
var result;
this.forEach((element,index) => {
result.push(callback(element,index))
});
return result;
}

3.filter

1
2
3
4
5
6
7
8
9
Array.prototype.filter = function(callback) {
var result = [];
this.forEach((ele,index) => {
if(callback(ele){
result.push(ele)
})
})
return result;
}

4.concatAll

合并二维数组

1
2
3
4
5
6
7
8
9
10
Array.prototype.concatAll = function() {
var result = [];
this.forEach((array) => {
result.push.apply(result,array);
//result.push(...array) es6写法
//result.push.call(result,...array)
});
return result;

}

##二、Observable的基本建立

create of from fromEvent fromPromise never empty throw interval timer 以上为Observable的实例方法creation operator

1.create

createObservable的基本建立方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var observable = Rx.Observable
.create((observer) => {
observer.next('111');
observer.next('222);
setTimeout(function () {
observer.next('333');
})
});
console.log('start')
observable.subscribe(function (val) {
console.log(val);
});
console.log('end')
//Observable 相当于推送事件可以同时推送同步和异步时间并且可以暂停
//上面代码会输出:
start
111
222
end
333
compelte

2.of

当同步传值时候可以直接使用of更方便简洁

1
2
3
4
5
6
7
8
9
10
11
var observable = Rx.Observable
.of('111','222);
observable.subscribe({
value => console.log(vale);
error => console.log(error);
() => console.log('compelte)'
})
//以上程序打印出:
111
222
compelte

3.from

fromof差不多from参数为数组, set, WeakSet, Iterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
var arr = ['1','2','3'];
var source = Rx.Observable.from(arr);
observable.subscribe({
value => console.log(vale);
error => console.log(error);
() => console.log('compelte)'
})
//以上程序打印出:
111
222
compelte
//**from也可以接受字符串,例如"你好"会打印出:"你","好","compelte"**
//也可以传入promise
var source = Rx.Observable
.from(new Promise((resolve, reject) => {
setTimeout(() => {
resolve('Hello RxJS!');
},3000)
}))

source.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error)
}
});

// Hello RxJS!
// complete!

如果promise 返回的是resolve则执行next,如果返回reject执行error,这里使用formPromise返回的结果也是一样的

4.fromEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
var source= Rx.observable.fromEvent(document.body,'click');
source.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error)
}
});
//打印出mouseEvent()

fromEvent传入的第一个参数为dom,第二个参数为需要监听事件的名称

补充:fromEventPattern

要用Event来建立Observable实例还有另一个方法fromEventPattern,这个方法是给类事件使用。所谓的类事件就是指其行为跟事件相像,同时具有注册监听及移除监听两种行为,就像DOM EventaddEventListenerremoveEventListener一样!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Producer {
constructor() {
this.listenList = [];
}
addListener(listener) {
if(typeof listenner === 'function') {
this.listenList.push(listenner);
} else {
console.log('listenner 必须是方法')
}
}
removeListener(listener) {
this.listerList.splice(this.listerList.indexOf(listener),1)
}
norify(message) {
this.listenList.forEach(listener=> {
console.log(message)
})
}
}
//
var egghead = new Producer();
var souce = Rx.Observable.formEventPattern({
(handle) => egghead.addListener(handle);
(handle) => eegghead.removeListenner(handle);
});
source.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error)
}
});
egghandle.notify('can i listener u')
//can i listener u

这里要注意不要直接将方法传入,避免this出错!也可以用bind来写。

1
2
3
4
5
6
Rx.Observable
.fromEventPattern(
egghead.addListener.bind(egghead),
egghead.removeListener.bind(egghead)
)
.subscribe(console.log)

5.empty

Observable可以订阅一个空的事件,不会返回任何东西会立即返回complete

6.never

Observable也可以订阅一个无穷的事件,不会返回任何东西但是会一直存在不会complete

7.thorw

此方法只会抛出错误

1
2
3
4
5
6
7
8
9
10
11
12
13
var source = Rx.Observable.thorw("thow");
source.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error)
}
});
// Throw Error: thow!

8.interval

interval必须有一个number数值,代表发出的时间间隔

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var source = Rx.Observable.interval(1000);
source.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error)
}
});
// 0
// 1
// 2

9.timer
当timer有两个参数时,第一个参数代表要发出第一个值的等待时间(ms),第二个参数代表第一次之后发送值的间隔时间,所以上面这段程式码会先等一秒送出1之后每五秒送出2, 3, 4, 5…。

timer 第一个参数除了可以是数值(Number)之外,也可以是日期(Date),就会等到指定的时间在发送第一个值。

另外timer也可以只接收一个参数
10.unsubscribe

停止订阅

###三、Observer观察者

observer分别拥有三个方法 next error complete

观察者的三个方法(method):

  • next:每当Observable 发送出新的值,next 方法就会被呼叫。

  • complete:在Observable 没有其他的资料可以取得时,complete 方法就会被呼叫,在complete 被呼叫之后,next 方法就不会再起作用。

  • error:每当Observable 内发生错误时,error 方法就会被呼叫。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    var observable = Rx.Observable
    .create((observer) => {
    observer.next('111');
    observer.next('222);
    observer.compelte()
    setTimeout(function () {
    observer.next('333');
    })
    });
    var observer = {
    next: (value) => {
    console.log(value)
    },
    error: (error) => {
    console.log(value)
    },
    complete: () => {
    console.log('compelte')
    }
    }
    observable.subscribe(observer);
    //上面代码会输出:
    111
    222
    //可以看出当compelte执行完之后的next会失效

###三、Observable的Operators方法

转换(transformation) 过滤(filter) 合并(combination)
1.map

1
2
3
var source = Rx.Observable.interval(1000);
var newest = source.map(x = x+1);
newest.subscribe(console.log())

珠宝图 Marble diagrams

1
2
3
source: -----0-----1-----2-----3--...
map(x => x + 1)
newest: -----1-----2-----3-----4--...

2.mapTo

mapTo可以吧传进来的值改成指定的值

1
2
3
4
5
6
7
var source = Rx.Observable.interval(1000);
var newest = source.mapTo(2);
newest.subscribe(console.log);
//
2
2
2

珠宝图Marble diagrams

1
2
3
source: -----0-----1-----2-----3--...
mapTo(2)
newest: -----2-----2-----2-----2--...

3.filter

1
2
3
4
5
6
7
8
var source = Rx.Observable.interval(1000);
var newest = source.filter(x => x % 2 === 0);

newest.subscribe(console.log);
// 0
// 2
// 4
// 6..

珠宝图Marble diagrams

1
2
3
source: -----0-----1-----2-----3-----4-...
filter(x => x % 2 === 0)
newest: -----0-----------2-----------4-...

4.take

take表示取前几个就结束

1
2
3
4
5
6
7
8
9
10
11
12
var source = Rx.Observable.interval(1000);
var example = source.take(3);
example.subscribe({
next:(val) => {console.log(val)};
error: (err) => {console.log(err)};
complete: () => {console.log(complete)};
})
//
0
1
2
complete

珠宝图Marble diagrams

1
2
3
source : -----0-----1-----2-----3--..
take(3)
example: -----0-----1-----2|

5.first

和take差不多取第一个就结束

1
2
3
4
5
6
7
8
9
10
11
var source = Rx.Observable.interval(1000);
var example = source.first();
example.subscribe({
next:(val) => {console.log(val)};
error: (err) => {console.log(err)};
complete: () => {console.log(complete)};
})
//
0

complete

珠宝图Marble diagrams

1
2
3
source : -----0-----1-----2-----3--..
first()
example: -----0|

6.takeUntil

直到某个事件发生以后才结束发出compelte

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var source = Rx.Observable.interval(1000);
var click = Rx.Observable.fromEvent(document.body, 'click');
var example = source.takeUntil(click);

example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// complete (點擊body了

珠宝图Marble diagrams

1
2
3
4
source : -----0-----1-----2------3--
click : ----------------------c----
takeUntil(click)
example: -----0-----1-----2----|

7.concatAll

相当于把二维数组转换成一维数组

1
2
3
4
5
6
7
8
9
var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.of(1,2,3));

var example = source.concatAll();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});

珠宝图Marble diagrams

1
2
3
4
5
6
7
8
9
10
11
click  : ------c------------c--------

map(e => Rx.Observable.of(1,2,3))

source : ------o------------o--------
\ \
(123)| (123)|

concatAll()

example: ------(123)--------(123)------------

这里可以看到sourceobservable内部每次发送的值也是observable,这时我们用concatAll就可以把source摊平成example。

这里需要注意的是concatAll会处理source先发出来的observable,必须等到这个observable结束,才会再处理下一个source发出来的observable,让我们用下面这个范例说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var obs1 = Rx.Observable.interval(1000).take(5);
var obs2 = Rx.Observable.interval(500).take(2);
var obs3 = Rx.Observable.interval(2000).take(1);

var source = Rx.Observable.of(obs1, obs2, obs3);

var example = source.concatAll();

example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 0
// 1
// 0
// complete

8.skip

省略前几个发送的元素

1
2
3
4
5
6
7
8
9
10
11
var source = Rx.Observable.interval(1000);
var example = source.skip(3);

example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 3
// 4
// 5...

珠宝图

1
2
3
source : ----0----1----2----3----4----5--....
skip(3)
example: -------------------3----4----5--...

9.takeLast

取最后第几个值

1
2
3
4
5
6
7
8
9
10
11
var source = Rx.Observable.interval(1000).take(6);
var example = source.takeLast(2);

example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 4
// 5
// complete

珠宝图

1
2
3
source : ----0----1----2----3----4----5|
takeLast(2)
example: ------------------------------(45)|

10.last

去最后一个元素

1
2
3
4
5
6
7
8
9
10
var source = Rx.Observable.interval(1000).take(6);
var example = source.last();

example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 5
// complete

珠宝图

1
2
3
4
5
6
source : ----0----1----2----3----4----5|
last()
example: ------------------------------(5)|
```
11.concat
>把多个实例合并成一个

var source = Rx.Observable.interval(1000).take(3);
var source2 = Rx.Observable.of(3)
var source3 = Rx.Observable.of(4,5,6)
var example = source.concat(source2, source3);

example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log(‘Error: ‘ + err); },
complete: () => { console.log(‘complete’); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// complete

1
珠宝图

source : —-0—-1—-2|
source2: (3)|
source3: (456)|
concat()
example: —-0—-1—-2(3456)|

1
2
12.startWith
>表示一开始要发送的元素可以在前面插入

var source = Rx.Observable.interval(1000);
var example = source.startWith(0);

example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log(‘Error: ‘ + err); },
complete: () => { console.log(‘complete’); }
});
// 0
// 0
// 1
// 2
// 3…

1
珠宝图

source : —-0—-1—-2—-3–…
startWith(0)
example: (0)—-0—-1—-2—-3–…

1
2
13.merge
>合并多个实例

var source = Rx.Observable.interval(500).take(3);
var source2 = Rx.Observable.interval(300).take(6);
var example = source.merge(source2);

example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log(‘Error: ‘ + err); },
complete: () => { console.log(‘complete’); }
});
// 0
// 0
// 1
// 2
// 1
// 3
// 2
// 4
// 5
// complete

1
珠宝图

source : —-0—-1—-2|
source2: –0–1–2–3–4–5|
merge()
example: –0-01–21-3–(24)–5|

1
2
3
merge 的邏輯有點像是 OR(||),就是當兩個 observable 其中一個被觸發時都可以被處理,這很常用在一個以上的按鈕具有部分相同的行為。

例如一個影片播放器有兩個按鈕,一個是暫停(II),另一個是結束播放(口)。這兩個按鈕都具有相同的行為就是影片會被停止,只是結束播放會讓影片回到 00 秒,這時我們就可以把這兩個按鈕的事件 merge 起來處理影片暫停這件事。

var stopVideo = Rx.Observable.merge(stopButton, endButton);

stopVideo.subscribe(() => {
// 暫停播放影片
})

1
2
3
https://ithelp.ithome.com.tw/articles/10187638
14.combineLatest
>首先我們要介紹的是 combineLatest,它會取得各個 observable 最後送出的值,再輸出成一個值,我們直接看範例會比較好解釋。

var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);

var example = source.combineLatest(newest, (x, y) => x + y);

example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log(‘Error: ‘ + err); },
complete: () => { console.log(‘complete’); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// complete

1
珠宝图

source : —-0—-1—-2|
newest : –0–1–2–3–4–5|

combineLatest(newest, (x, y) => x + y);

example: —-01–23-4–(56)–7|

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
首先combineLatest可以接收多个observable,最后一个参数是callback function,这个callback function接收的参数数量跟合并的observable数量相同,依照范例来说,因为我们这里合并了两个observable所以后面的callback function就接收x, y两个参数,x会接收从source发送出来的值,y会接收从newest发送出来的值。

最后一个重点就是一定会等两个observable都曾有送值出来才会呼叫我们传入的callback,所以这段程式是这样运行的

- newest送出了0,但此时source并没有送出过任何值,所以不会执行callback
- source送出了0,此时newest最后一次送出的值为0,把这两个数传入callback得到0。
- newest送出了1,此时source最后一次送出的值为0,把这两个数传入callback得到1。
- newest送出了2,此时source最后一次送出的值为0,把这两个数传入callback得到2。
- source送出了1,此时newest最后一次送出的值为2,把这两个数传入callback得到3。
- newest送出了3,此时source最后一次送出的值为1,把这两个数传入callback得到4。
- source送出了2,此时newest最后一次送出的值为3,把这两个数传入callback得到5。
- source 结束,但newest 还没结束,所以example 还不会结束。
- newest送出了4,此时source最后一次送出的值为2,把这两个数传入callback得到6。
- newest送出了5,此时source最后一次送出的值为2,把这两个数传入callback得到7。
- newest 结束,因为source 也结束了,所以example 结束。
不管是source 还是newest 送出值来,只要另一方曾有送出过值(有最后的值),就会执行callback 并送出新的值,这就是combineLatest。

combineLatest 很常用在运算多个因子的结果,例如最常见的BMI 计算,我们身高变动时就拿上一次的体重计算新的BMI,当体重变动时则拿上一次的身高计算BMI,这就很适合用combineLatest 来处理!
15.zip
>在讲withLatestFrom 之前,先让我们先来看一下zip 是怎么运作的,zip 会取每个observable 相同顺位的元素并传入callback,也就是说每个observable 的第n 个元素会一起被传入callback ,这里我们同样直接用范例讲解会比较清楚

var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);

var example = source.zip(newest, (x, y) => x + y);

example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log(‘Error: ‘ + err); },
complete: () => { console.log(‘complete’); }
});
// 0
// 2
// 4
// complete

1
珠宝图

source : —-0—-1—-2|
newest : –0–1–2–3–4–5|
zip(newest, (x, y) => x + y)
example: —-0—-2—-4|

1
2
3
4
5
6
7
8
9
10
11
以我们的范例来说,zip会等到source跟newest 都送出了第一个元素,再传入callback,下次则等到source跟newest 都送出了第二个元素再一起传入callback,所以运行的步骤如下:

- newest送出了第一个值0,但此时source并没有送出第一个值,所以不会执行callback。
- source送出了第一个值0,newest之前送出的第一个值为0,把这两个数传入callback得到0。
- newest送出了第二个值1,但此时source并没有送出第二个值,所以不会执行callback。
- newest送出了第三个值2,但此时source并没有送出第三个值,所以不会执行callback。
- source送出了第二个值1,newest之前送出的第二个值为1,把这两个数传入callback得到2。
- newest送出了第四个值3,但此时source并没有送出第四个值,所以不会执行callback。
- source送出了第三个值2,newest之前送出的第三个值为2,把这两个数传入callback得到4。
- source 结束example 就直接结束,因为source 跟newest 不会再有对应顺位的值
zip 会把各个observable 相同顺位送出的值传入callback,这很常拿来做demo 使用,比如我们想要间隔100ms 送出'h', 'e', 'l', 'l', 'o',就可以这么做

var source = Rx.Observable.from(‘hello’);
var source2 = Rx.Observable.interval(100);

var example = source.zip(source2, (x, y) => x);

1
2

这里的Marble Diagram 就很简单

source : (hello)|
source2: -0-1-2-3-4-…
zip(source2, (x, y) => x)
example: -h-e-l-l-o|

1
2
3
4
5
6

这里我们利用zip 来达到原本只能同步送出的资料变成了非同步的,很适合用在建立示范用的资料。

建议大家平常没事不要乱用zip,除非真的需要。因为zip 必须cache 住还没处理的元素,当我们两个observable 一个很快一个很慢时,就会cache 非常多的元素,等待比较慢的那个observable。这很有可能造成记忆体相关的问题!
16.withLatestFrom
>withLatestFrom 运作方式跟combineLatest 有点像,只是他有主从的关系,只有在主要的observable 送出新的值时,才会执行callback,附随的observable 只是在背景下运作。让我们看一个例子

var main = Rx.Observable.from(‘hello’).zip(Rx.Observable.interval(500), (x, y) => x);
var some = Rx.Observable.from([0,1,0,0,0,1]).zip(Rx.Observable.interval(300), (x, y) => x);

var example = main.withLatestFrom(some, (x, y) => {
return y === 1 ? x.toUpperCase() : x;
});

example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log(‘Error: ‘ + err); },
complete: () => { console.log(‘complete’); }
});

1
珠宝图Marble diagrams

main : —-h—-e—-l—-l—-o|
some : –0–1–0–0–0–1|

withLatestFrom(some, (x, y) => y === 1 ? x.toUpperCase() : x);

example: —-h—-e—-l—-L—-O|

1
2
3
4
5
6
7
8
9
10
11
withLatestFrom 会在main 送出值的时候执行callback,但请注意如果main 送出值时some 之前没有送出过任何值callback 仍然不会执行!

这里我们在main 送出值时,去判断some 最后一次送的值是不是1 来决定是否要切换大小写,执行步骤如下

- main送出了h,此时some上一次送出的值为0,把这两个参数传入callback得到h。
- main送出了e,此时some上一次送出的值为0,把这两个参数传入callback得到e。
- main送出了l,此时some上一次送出的值为0,把这两个参数传入callback得到l。
- main送出了l,此时some上一次送出的值为1,把这两个参数传入callback得到L。
- main送出了o,此时some上一次送出的值为1,把这两个参数传入callback得到O。
withLatestFrom 很常用在一些checkbox 型的功能,例如说一个编辑器,我们开启粗体后,打出来的字就都要变粗体,粗体就像是some observable,而我们打字就是main
###四、实战实现简易拖拉

const dragDOM = document.getElementById(‘drag’);
const body = document.body;

const mouseDown = Rx.Observable.fromEvent(dragDOM, ‘mousedown’);
const mouseUp = Rx.Observable.fromEvent(body, ‘mouseup’);
const mouseMove = Rx.Observable.fromEvent(body, ‘mousemove’);

mouseDown
.map(event => mouseMove.takeUntil(mouseUp))
.concatAll()
.map(event => ({ x: event.clientX, y: event.clientY }))
.subscribe(pos => {
dragDOM.style.left = pos.x + ‘px’;
dragDOM.style.top = pos.y + ‘px’;
})
`