Concurrency In Python 简明教程

Reactive Programming

响应式编程是一种处理数据流和变化传播的编程范例。这意味着当数据流由一个组件发出时,变化将由响应式编程库传播到其他组件。变化的传播将持续到它达到最终接收器。事件驱动和响应式编程之间的区别在于,事件驱动编程围绕事件进行,而响应式编程围绕数据进行。

Reactive programming is a programming paradigm that deals with data flows and the propagation of change. It means that when a data flow is emitted by one component, the change will be propagated to other components by reactive programming library. The propagation of change will continue until it reaches the final receiver. The difference between event-driven and reactive programming is that event-driven programming revolves around events and reactive programming revolves around data.

ReactiveX or RX for reactive programming

ReactiveX 或 Raective Extension 是响应式编程最著名的实现。ReactiveX 的工作取决于以下两个类 −

ReactiveX or Raective Extension is the most famous implementation of reactive programming. The working of ReactiveX depends upon the following two classes −

Observable class

此类是数据流或事件的源,它打包传入数据以便可以将数据从一个线程传递到另一个线程。它不会提供数据,直到某些观察者订阅它。

This class is the source of data stream or events and it packs the incoming data so that the data can be passed from one thread to another. It will not give data until some observer subscribe to it.

Observer class

此类消耗由 observable 发出的数据流。可观察对象可以有多个观察者,每个观察者都会接收发出的每个数据项。观察者可以通过订阅可观察对象接收三种事件 −

This class consumes the data stream emitted by observable. There can be multiple observers with observable and each observer will receive each data item that is emitted. The observer can receive three type of events by subscribing to observable −

  1. on_next() event − It implies there is an element in the data stream.

  2. on_completed() event − It implies end of emission and no more items are coming.

  3. on_error() event − It also implies end of emission but in case when an error is thrown by observable.

RxPY – Python Module for Reactive Programming

RxPY 是一个可用于响应式编程的 Python 模块。我们需要确保安装了该模块。可以使用以下命令安装 RxPY 模块 −

RxPY is a Python module which can be used for reactive programming. We need to ensure that the module is installed. The following command can be used to install the RxPY module −

pip install RxPY

Example

以下是使用 RxPY 模块及其类 ObservableObserve for 响应式编程的 Python 脚本。基本上有两个类 −

Following is a Python script, which uses RxPY module and its classes Observable and Observe for reactive programming. There are basically two classes −

  1. get_strings() − for getting the strings from observer.

  2. PrintObserver() − for printing the strings from observer. It uses all three events of observer class. It also uses subscribe() class.

from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

Output

Received Ram
Received Mohan
Received Shyam
Finished

PyFunctional library for reactive programming

*PyFunctional*是另一个可用于响应式编程的 Python 库。它使我们能够使用 Python 编程语言创建函数程序。它很有用,因为它允许我们使用链接函数操作符创建数据管道。

*PyFunctional*is another Python library that can be used for reactive programming. It enables us to create functional programs using the Python programming language. It is useful because it allows us to create data pipelines by using chained functional operators.

Difference between RxPY and PyFunctional

这两个库都用于响应式编程并以类似的方式处理流,但它们之间的主要区别取决于处理数据的方式。 RxPY 处理系统中的数据和事件,而 PyFunctional 专注于使用函数编程范例转换数据。

Both the libraries are used for reactive programming and handle the stream in similar fashion but the main difference between both of them depends upon the handling of data. RxPY handles data and events in the system while PyFunctional is focused on transformation of data using functional programming paradigms.

Installing PyFunctional Module

我们需要在使用此模块之前安装它。可以使用 pip 命令按照如下方式进行安装 −

We need to install this module before using it. It can be installed with the help of pip command as follows −

pip install pyfunctional

Example

以下示例使用 the PyFunctional 模块及其 seq 类,它们充当我们可以迭代和操作的流对象。在此程序中,它使用将每个值加倍的 lamda 函数映射序列,然后过滤 x 大于 4 的值,最后将序列约简为所有剩余值的总和。

Following example uses the PyFunctional module and its seq class which act as the stream object with which we can iterate and manipulate. In this program, it maps the sequence by using the lamda function that doubles every value, then filters the value where x is greater than 4 and finally it reduces the sequence into a sum of all the remaining values.

from functional import seq

result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)

print ("Result: {}".format(result))

Output

Result: 6