Concurrency In Python 简明教程

Reactive Programming

响应式编程是一种处理数据流和变化传播的编程范例。这意味着当数据流由一个组件发出时,变化将由响应式编程库传播到其他组件。变化的传播将持续到它达到最终接收器。事件驱动和响应式编程之间的区别在于,事件驱动编程围绕事件进行,而响应式编程围绕数据进行。

ReactiveX or RX for reactive programming

ReactiveX 或 Raective Extension 是响应式编程最著名的实现。ReactiveX 的工作取决于以下两个类 −

Observable class

此类是数据流或事件的源,它打包传入数据以便可以将数据从一个线程传递到另一个线程。它不会提供数据,直到某些观察者订阅它。

Observer class

此类消耗由 observable 发出的数据流。可观察对象可以有多个观察者,每个观察者都会接收发出的每个数据项。观察者可以通过订阅可观察对象接收三种事件 −

  1. on_next() event − 它表明数据流中存在一个元素。

  2. on_completed() event − 它表明发出的数据已结束,不再有更多条目。

  3. on_error() event − 它还表明发出的数据已结束,但当 observable 抛出错误时。

RxPY – Python Module for Reactive Programming

RxPY 是一个可用于响应式编程的 Python 模块。我们需要确保安装了该模块。可以使用以下命令安装 RxPY 模块 −

pip install RxPY

Example

以下是使用 RxPY 模块及其类 ObservableObserve for 响应式编程的 Python 脚本。基本上有两个类 −

  1. get_strings() − 用于从观察者获取字符串。

  2. PrintObserver() − 用于从观察者打印字符串。它使用了观察者类的全部三个事件。它还使用 subscribe() 类。

from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

Output

Received Ram
Received Mohan
Received Shyam
Finished

PyFunctional library for reactive programming

*PyFunctional*是另一个可用于响应式编程的 Python 库。它使我们能够使用 Python 编程语言创建函数程序。它很有用,因为它允许我们使用链接函数操作符创建数据管道。

Difference between RxPY and PyFunctional

这两个库都用于响应式编程并以类似的方式处理流,但它们之间的主要区别取决于处理数据的方式。 RxPY 处理系统中的数据和事件,而 PyFunctional 专注于使用函数编程范例转换数据。

Installing PyFunctional Module

我们需要在使用此模块之前安装它。可以使用 pip 命令按照如下方式进行安装 −

pip install pyfunctional

Example

以下示例使用 the PyFunctional 模块及其 seq 类,它们充当我们可以迭代和操作的流对象。在此程序中,它使用将每个值加倍的 lamda 函数映射序列,然后过滤 x 大于 4 的值,最后将序列约简为所有剩余值的总和。

from functional import seq

result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)

print ("Result: {}".format(result))

Output

Result: 6