rxjs全接触

RxJS是用于通过使用观察序列构成异步和基于事件的程序库。它提供了一个核心类型——Observable (Observer, Schedulers, Subjects)和数组#其他(map, filter, reduce, every, etc),以允许处理异步事件集合。

一、rxjs内容

ReactiveX结合观察者模式与迭代器模式和函数式编程与收藏,以填补管理事件序列的理想方法的需要。

在RxJS其中解决异步事件管理的基本概念是:

1. Observable

可观察对象,代表未来值或事件的可调用集合的想法. observer是一个由回调函数组成的对象,键名分别为nexterrorcomplete,以此接受Observable推送的不同类型的通知。

1
2
3
4
5
  var observer = {
    next: x => console.log('Observer got a next value: ' + x),
    error: err => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification'),
  };

调用Observer逻辑,只需在subscribe(订阅)Observable后将Observer传入:observable.subscribe(observer);

2. Observer

观察者,也被称为称为消费者,是回调的集合,它知道如何监听由观测交付价值

3. Subscription

Subscription是一个代表可以终止资源的对象,表示一个Observable的执行过程。Subscription有一个重要的方法:unsubscribe。这个方法不需要传入参数,调用后便会终止相应的资源。

4. Operators

操作者,是一种纯函数,能够使用函数式编程风格处理集合,具体操作有map, filter, concat, flatMap,等

5. Subject

在RxJS中,Subject是一类特殊的Observable,它可以向多个Observer多路推送数值。普通的Observable并不具备多路推送的能力(每一个Observer都有自己独立的执行环境),而Subject可以共享一个执行环境。Subject是一种可以多路推送的可观察对象。与EventEmitter类似,Subject维护着自己的Observer。 每一个Subject都是一个Observable(可观察对象) 对于一个Subject,你可以订阅(subscribe)它,Observer会和往常一样接收到数据。从Observer的视角看,它并不能区分自己的执行环境是普通Observable的单路推送还是基于Subject的多路推送。 每一个Subject也可以作为Observer(观察者) Subject同样也是一个由next(v),error(e),和 complete()这些方法组成的对象。调用next(theValue)方法后,Subject会向所有已经在其上注册的Observer多路推送theValue。

6. Schedulers

调度器,是用于控制并发的中心分发器,当计算发生在setTimeout or requestAnimationFrame 时,允许我们来进行协调。

7、推和拉

拉和推是两个不同的协议如何数据生产者可以与数据消费者。

拉 数据消费者可以决定什么时候去接收数据提供者的数据。数据提供者自身并不知道什么时候把数据交给数据消费者。 每个JavaScript函数是一个拉式系统。该功能是数据的生产者,而调用该函数的代码由“揪”出了消费它单从它调用的返回值。

推 在推送系统,生产者确定何时发送数据到消费者。消费者是不知道什么时候会收到的数据。 在JavaScript中目前Promises是最常用的推系统类型。 一个Promises(提供者)传递了一个处理好的值然后注册一个回调(消费者)。但是跟方法不同,它是应许,负责确定正是当值被“推”到了回调。

二、相关api

1. scan()

使用RxJS您隔离的状态。

1
2
3
4
  var button = document . querySelector ( 'button' );
  Rx . Observable . fromEvent ( button ,  'click' )
    . scan ( count => count +  1 ,  0 )
    . subscribe ( count => console . log ( 'Clicked {$ count}次' ));

2. subscribe()

异步函数,可以使用一个回调

1
2
3
var button = document . querySelector ( 'button' );
Rx . Observable . fromEvent ( button ,  'click' )
  . subscribe (()  => console . log ( 'Clicked!' ));

3. throttle()

4. map()

每秒最多点击次数

1
2
3
4
5
6
  var button = document . querySelector ( 'button' );
  Rx . Observable . fromEvent ( button ,  'click' )
    . throttle ( 1000 )
    . map ( event  =>  event . clientX )
    . scan (( count , clientX )  => count + clientX ,  0 )
    . subscribe ( count => console . log ( count ));

三、具体使用

1. Observable

我们定义一个observable,前3个是步同执行。第四个是1秒后执行(异步),执行完4之后整个observable才算完成。

1
2
3
4
5
6
7
8
9
  var observable =  Rx . Observable . create ( function  ( observer )  {
    observer . next ( 1 );
    observer . next ( 2 );
    observer . next ( 3 );
    setTimeout (()  =>  {
      observer . next ( 4 );
      observer . complete ();
    },  1000 );
  });

2.subscribe

为了能够看到这些值,我们需要订阅(subscribe)它。

1
2
3
4
5
6
7
  console.log('just before subscribe');
  observable.subscribe({
    next: x => console.log('got value ' + x),
    error: err => console.error('something wrong occurred: ' + err),
    complete: () => console.log('done'),
  });
  console.log('just after subscribe');

结果如下

1
2
3
4
5
6
7
  just before subscribe
  got value 1
  got value 2
  got value 3
  just after subscribe
  got value 4
  done

通过结果我们可以观察到,前三个执行之后就直接跳出订阅范围,等1秒钟之后才执行4,然后才完成。

3、amb方法

方法定义 [Rx.Observable.amb(...args)] 作用 从一系列流中,订阅最先发射的值的可观察对象并忽略其他的可观察对象。 参数 args (Array|arguments):方法参数为多个可观察对象(流),或者是Promise对象,对象间存在竞争关系。 返回值 (Observable) :方法返回呈竞争态的多个可观察对象中,首先发射的可观察对象。 总结 简单的说,amb()像一个多路电闸,一次仅能构建一条通路:

1
2
3
4
5
6
7
8
9
| | | | | | | |
A B C D E F G H
| | | | | | | |
     \
      \   开关臂
       \   
       |
      主线
       |

实例

HTML

1
2
3
4
<body>
  <input id="input1" type="text">
  <input id="input2" type="text">
</body>

JS

1
2
3
4
5
6
7
8
// 取元素
var input1 = $('#input1');
var input2 = $('#input2');
// 定义事件流
var event1 = Rx.Observable.fromEvent(input1, 'click').map(()=>'one')
var event2 = Rx.Observable.fromEvent(input2, 'click').map(()=>'two');
//传入事件流
var source = Rx.Observable.amb(event1,event2);

上面例子中 amb()中传入了两个点击事件流。 事件流1,会在点击后发射字符串one; 事件流2,会在点击后发射字符串two; 初始情况下,产生事件流1之后,事件流2不会再被输出; 反之亦然,我们可以订阅amb()产生的结果流:amb()操作符演示

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
var subscription = source.subscribe(
    function (x) {
        console.log(x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });

使用场景 1:秒杀

1
2
3
4
5
6
7
8
Observable.amb(
    用户A的拍下请求,
    用户B的拍下请求,
    用户C的拍下请求,
    ...
).subscribe(function(user) {
    执行购买逻辑,创建订单,打开支付工具
})

使用场景 1:通话

1
2
3
4
5
6
7
8
Observable.amb(
    A来电,
    B来电,
    C来电,
    ...
).subscribe(function(call) {
    通话
})

4. case方法

方法定义 [Rx.Observable.case(selector, sources, [elseSource|scheduler])]

作用 选择序列中特定可观察对象进行订阅,在特定可观察对象不存在的情况下,返回传入的默认可观察对象。

参数 selector (Function): 返回键的字符串的函数,键用以与sources中的键名进行比较。 sources (Object): 一个包含可观察对象的Javascript对象。 [elseSource|scheduler] (Observable | Scheduler):当selector无法匹配sources时,该对象被默认返回。 如果没有明确指定,将返回附加了指定scheduler的Rx.Observabe.empty 对象。

返回值 (Observable): 返回值为经过选择后的Observable(可观察对象)。

实例

1
2
3
4
5
6
7
8
9
  var sources = {
    hello: Rx.Observable.just('clx'),
    world: Rx.Observable.just('wxq')
  };
  var subscription = Rx.Observable.case(()=>"hello", sources, Rx.Observable.empty())

  subscription.subscribe(function(x) {
    console.log(x)
  })

实例中,匿名函数()=>“hello"指定需要在sources中返回的可观察对象的键名为"hello”,命令行最终输出"clx",点击进入case()实例

适用场景 针对表单进行校验,校验用户的手机号邮箱是否和服务器记录重复,将所有校验封装在validate对象中结构更为合理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  var validate = {
      "mobile": Observable.return('123-566-789-01'),
      "email": Observable.return('JonSnow@company.com')
  };
  var emptyObserable = Observable.empty();
  validate.case(()=>'mobile', validate, empty)
      .subscribe(function(mobile){
          // 验证手机号码是否重复
      })
  validate.case(()=>'email', validate, empty)
      .subscribe(function(email){
          // 验证用户邮箱是否重复
      })

4. catch()方法

方法定义 Rx.Observable.catch(…args)

作用 序列中可观察对象因为异常而被终止后,继续订阅序列中的其他可观察对象。

参数 args (Array | arguments): 可观察对象序列。

返回值 (Observable): 可观察对象序列中能够正确终止,不抛出异常的第一个可观察对象。

实例

1
2
3
4
5
6
7
8
9
  var obs1 = Rx.Observable.throw(new Error('error'));
  var obs2 = Rx.Observable.return(42);

  var source = Rx.Observable.catch(obs1, obs2);

  var subscription = source.subscribe(
    x => console.log(`onNext: ${x}`),
    e => console.log(`onError: ${e}`),
    () => console.log('onCompleted'));

手机验证码实例 这样,用户能够收到验证码并成功验证的几率大大增加。

1
2
3
4
5
6
7
8
  var service1 = Observable.create("服务提供商#1");
  var service2 = Observable.create("服务提供商#1");

  Observable.catch(service1, service2).subscribe({
      ()=>console.log('succeed'),
      ()=>console.log('所有验证服务均不可用')
      ()=>console.log('completed')
  })

5. combineLatest方法

方法定义 Rx.Observable.combineLatest(…args, [resultSelector])

作用 通过处理函数总是将指定的可观察对象序列中最新发射的值合并为一个可观察对象。

参数 args (arguments | Array): 一系列可观察对象或可观察对象的数组。 [resultSelector] (Function): 在所有可观察对象都发射值后调用的处理函数。

返回值 (Observable): 由传入的可观察序列经过处理函数合并后的结果组成的可观察序列。

实例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  var colors = ["紫色","黄色","蓝色","黑色"];
  var shapes = ["小星星","圆形","三角形","正方形","心形","五边形"];
  var source1 = Rx.Observable.interval(3000)
    .map(()=>colors.pop());
  var source2 = Rx.Observable.interval(2000)
    .map(()=>shapes.pop());

  var combined = Rx.Observable.combineLatest(source1, source2, function(x, y){
    return x + "的" + y;
  }).take(8);

  combined.subscribe((shaped)=>console.log(shaped));
署名 - 非商业性使用 - 禁止演绎 4.0