Postgresql 中文操作指南

49.6. Logical Decoding Output Plugins #

可以在 PostgreSQL 源树的 contrib/test_decoding 子目录中找到一个示例输出插件。

An example output plugin can be found in the contrib/test_decoding subdirectory of the PostgreSQL source tree.

49.6.1. Initialization Function #

输出插件通过动态加载一个以输出插件的名称作为库基本名称的共享库来加载。使用常规库搜索路径来找到库。为了提供所需的输出插件回调并指示库实际上是一个输出插件,它需要提供一个名为__PG_output_plugin_init_的函数。此函数传递一个结构,该结构需要使用回调函数指针填充各个操作。

An output plugin is loaded by dynamically loading a shared library with the output plugin’s name as the library base name. The normal library search path is used to locate the library. To provide the required output plugin callbacks and to indicate that the library is actually an output plugin it needs to provide a function named _PG_output_plugin_init. This function is passed a struct that needs to be filled with the callback function pointers for individual actions.

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
    LogicalDecodeFilterPrepareCB filter_prepare_cb;
    LogicalDecodeBeginPrepareCB begin_prepare_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeCommitPreparedCB commit_prepared_cb;
    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
    LogicalDecodeStreamStartCB stream_start_cb;
    LogicalDecodeStreamStopCB stream_stop_cb;
    LogicalDecodeStreamAbortCB stream_abort_cb;
    LogicalDecodeStreamPrepareCB stream_prepare_cb;
    LogicalDecodeStreamCommitCB stream_commit_cb;
    LogicalDecodeStreamChangeCB stream_change_cb;
    LogicalDecodeStreamMessageCB stream_message_cb;
    LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

begin_cbchange_cb_和_commit_cb_回调是必需的,而_startup_cbtruncate_cbmessage_cb、_filter_by_origin_cb_和_shutdown_cb_是可选的。如果未设置_truncate_cb_但要对_TRUNCATE_解码,则将忽略该操作。

The begin_cb, change_cb and commit_cb callbacks are required, while startup_cb, truncate_cb, message_cb, filter_by_origin_cb, and shutdown_cb are optional. If truncate_cb is not set but a TRUNCATE is to be decoded, the action will be ignored.

输出插件还可以定义函数以支持大规模正在进行的事务的流传输。stream_start_cbstream_stop_cbstream_abort_cb、_stream_commit_cb_和_stream_change_cb_是必需的,而_stream_message_cb_和_stream_truncate_cb_是可选的。如果输出插件还支持两阶段提交,则_stream_prepare_cb_也是必需的。

An output plugin may also define functions to support streaming of large, in-progress transactions. The stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb, and stream_change_cb are required, while stream_message_cb and stream_truncate_cb are optional. The stream_prepare_cb is also required if the output plugin also support two-phase commits.

输出插件还可以定义函数以支持两阶段提交,该提交允许在_PREPARE TRANSACTION_处解码操作。begin_prepare_cbprepare_cb、_commit_prepared_cb_和_rollback_prepared_cb_回调是必需的,而_filter_prepare_cb_是可选的。如果输出插件还支持大规模正在进行的事务的流传输,则_stream_prepare_cb_也是必需的。

An output plugin may also define functions to support two-phase commits, which allows actions to be decoded on the PREPARE TRANSACTION. The begin_prepare_cb, prepare_cb, commit_prepared_cb and rollback_prepared_cb callbacks are required, while filter_prepare_cb is optional. The stream_prepare_cb is also required if the output plugin also supports the streaming of large in-progress transactions.

49.6.2. Capabilities #

要解码、格式化和输出更改,输出插件可以使用大部分后端的常规基础架构,包括调用输出函数。允许只读访问关系,只要只访问由_initdb_在_pg_catalog_模式中创建的关系,或使用标记为用户提供的目录表的函数进行标记即可

To decode, format and output changes, output plugins can use most of the backend’s normal infrastructure, including calling output functions. Read only access to relations is permitted as long as only relations are accessed that either have been created by initdb in the pg_catalog schema, or have been marked as user provided catalog tables using

ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);

请注意,对输出插件中的用户目录表或常规系统目录表的访问必须仅通过_systable_*_ scan API完成。通过_heap_*_ scan API访问将出错。此外,禁止任何导致事务ID分配的操作。其中包括但不限于写入表、执行DDL更改和调用_pg_current_xact_id()_。

Note that access to user catalog tables or regular system catalog tables in the output plugins has to be done via the systable*_ scan APIs only. Access via the heap*_ scan APIs will error out. Additionally, any actions leading to transaction ID assignment are prohibited. That, among others, includes writing to tables, performing DDL changes, and calling pg_current_xact_id().

49.6.3. Output Modes #

输出插件的回调函数可以以几乎任意的格式向使用者传递数据。对于某些用例(比如通过 SQL 查看更改内容),以可以包含任意数据的类型(例如 bytea)返回数据非常麻烦。如果输出插件仅以服务器的编码方式输出文本数据,可以通过在 startup callback 中将 OutputPluginOptions.output_type 设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT 而非 OUTPUT_PLUGIN_BINARY_OUTPUT 来声明这一点。在这种情况下,所有数据都必须以服务器的编码进行编码,以便 text 数据项可以对其进行包含。此设置将检查已启用的声明中的设置。

Output plugin callbacks can pass data to the consumer in nearly arbitrary formats. For some use cases, like viewing the changes via SQL, returning data in a data type that can contain arbitrary data (e.g., bytea) is cumbersome. If the output plugin only outputs textual data in the server’s encoding, it can declare that by setting OutputPluginOptions.output_type to OUTPUT_PLUGIN_TEXTUAL_OUTPUT instead of OUTPUT_PLUGIN_BINARY_OUTPUT in the startup callback. In that case, all the data has to be in the server’s encoding so that a text datum can contain it. This is checked in assertion-enabled builds.

49.6.4. Output Plugin Callbacks #

输出插件会通过它需要提供的各种回调收到有关正在发生的更改的通知。

An output plugin gets notified about changes that are happening via various callbacks it needs to provide.

并发事务按提交顺序解码,并且仅在 begincommit 回调之间解码属于特定事务的更改。已显式或隐式回滚的事务永远不会被解码。成功的保存点以其在该事务中执行的顺序折叠到包含它们的交易中。使用 PREPARE TRANSACTION 准备双阶段提交的事务还将在提供解码所需的输出插件回调时进行解码。正在解码的当前准备事务可能通过 ROLLBACK PREPARED 命令同时中止。在这种情况下,此事务的逻辑解码也将中止。一旦检测到中止并调用 prepare_cb 回调,此类事务的所有更改都将被跳过。因此,即使在同时中止的情况下,也会向输出插件提供足够的信息,以便在解码后正确处理 ROLLBACK PREPARED

Concurrent transactions are decoded in commit order, and only changes belonging to a specific transaction are decoded between the begin and commit callbacks. Transactions that were rolled back explicitly or implicitly never get decoded. Successful savepoints are folded into the transaction containing them in the order they were executed within that transaction. A transaction that is prepared for a two-phase commit using PREPARE TRANSACTION will also be decoded if the output plugin callbacks needed for decoding them are provided. It is possible that the current prepared transaction which is being decoded is aborted concurrently via a ROLLBACK PREPARED command. In that case, the logical decoding of this transaction will be aborted too. All the changes of such a transaction are skipped once the abort is detected and the prepare_cb callback is invoked. Thus even in case of a concurrent abort, enough information is provided to the output plugin for it to properly deal with ROLLBACK PREPARED once that is decoded.

Note

只有已安全刷新到磁盘的事务才会被解码。当 synchronous_commit 设置为 off 时,在紧随其后的 pg_logical_slot_get_changes() 中,COMMIT 可能不会立即被解码。

Only transactions that have already safely been flushed to disk will be decoded. That can lead to a COMMIT not immediately being decoded in a directly following pg_logical_slot_get_changes() when synchronous_commit is set to off.

49.6.4.1. Startup Callback #

无论准备输出的更改数量如何,只要创建复制槽或要求其流式传输更改,都会调用可选的 startup_cb 回调。

The optional startup_cb callback is called whenever a replication slot is created or asked to stream changes, independent of the number of changes that are ready to be put out.

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);

当创建复制槽时,is_init 参数将为 true,否则将为 false。options 指向输出插件可以设置的选项结构:

The is_init parameter will be true when the replication slot is being created and false otherwise. options points to a struct of options that output plugins can set:

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
    bool        receive_rewrites;
} OutputPluginOptions;

output_type 必须设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUTOUTPUT_PLUGIN_BINARY_OUTPUT。另请参见 Section 49.6.3。如果 receive_rewrites 为真,那么在某些 DDL 操作期间堆重写所做的更改也能调用输出插件。对于可处理 DDL 复制的插件而言,这种情况存在问题,但需要特殊处理。

output_type has to either be set to OUTPUT_PLUGIN_TEXTUAL_OUTPUT or OUTPUT_PLUGIN_BINARY_OUTPUT. See also Section 49.6.3. If receive_rewrites is true, the output plugin will also be called for changes made by heap rewrites during certain DDL operations. These are of interest to plugins that handle DDL replication, but they require special handling.

启动回调应验证 ctx→output_plugin_options 中存在的选项。如果输出插件需要有状态,它可以使用 ctx→output_plugin_private 来存储它。

The startup callback should validate the options present in ctx→output_plugin_options. If the output plugin needs to have a state, it can use ctx→output_plugin_private to store it.

49.6.4.2. Shutdown Callback #

每当以前处于活动状态的复制槽不再使用时,将调用可选的 shutdown_cb 回调,并且可以使用该回调来释放输出插件私有的资源。槽不一定会被删除,流式传输只是停止了。

The optional shutdown_cb callback is called whenever a formerly active replication slot is not used anymore and can be used to deallocate resources private to the output plugin. The slot isn’t necessarily being dropped, streaming is just being stopped.

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

49.6.4.3. Transaction Begin Callback #

每当已解码提交事务的开始时,都需要调用 begin_cb 回调。已中止的事务及其内容永远不会被解码。

The required begin_cb callback is called whenever a start of a committed transaction has been decoded. Aborted transactions and their contents never get decoded.

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
                                      ReorderBufferTXN *txn);

txn 参数包含有关事务的元信息,如已提交的时间戳和 XID。

The txn parameter contains meta information about the transaction, like the time stamp at which it has been committed and its XID.

49.6.4.4. Transaction End Callback #

每当已解码事务提交时,都需要调用 commit_cb 回调。如果没有已修改的行,则会在之前调用所有已修改行的 change_cb 回调。

The required commit_cb callback is called whenever a transaction commit has been decoded. The change_cb callbacks for all modified rows will have been called before this, if there have been any modified rows.

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);

49.6.4.5. Change Callback #

对于事务中的每个单独行修改,都需要调用 change_cb 回调,它可能是 INSERTUPDATEDELETE。即使原始命令一次修改多行,也会对每行单独调用回调。change_cb 回调可以访问系统或用户目录表,以帮助输出行修改详细信息的过程。在解码已准备(但尚未提交)的事务或解码未提交的事务的情况下,此更改回调还可能由于此事务同时回滚而出错。在这种情况下,此中止事务的逻辑解码将正常停止。

The required change_cb callback is called for every individual row modification inside a transaction, may it be an INSERT, UPDATE, or DELETE. Even if the original command modified several rows at once the callback will be called individually for each row. The change_cb callback may access system or user catalog tables to aid in the process of outputting the row modification details. In case of decoding a prepared (but yet uncommitted) transaction or decoding of an uncommitted transaction, this change callback might also error out due to simultaneous rollback of this very same transaction. In that case, the logical decoding of this aborted transaction is stopped gracefully.

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);

ctxtxn 参数的内容与 begin_cbcommit_cb 回调相同,但另外关系描述符 relation 指向行所属的关系,并且描述行修改的 struct change 已传入。

The ctx and txn parameters have the same contents as for the begin_cb and commit_cb callbacks, but additionally the relation descriptor relation points to the relation the row belongs to and a struct change describing the row modification are passed in.

Note

只能提取未记录(请参阅 UNLOGGED )并且不是临时(请参阅 TEMPORARY or TEMP )的用户定义表中的更改,才能使用逻辑解码。

Only changes in user defined tables that are not unlogged (see UNLOGGED) and not temporary (see TEMPORARY or TEMP) can be extracted using logical decoding.

49.6.4.6. Truncate Callback #

可选的 truncate_cb 回调是被 TRUNCATE 命令调用的。

The optional truncate_cb callback is called for a TRUNCATE command.

typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
                                         ReorderBufferTXN *txn,
                                         int nrelations,
                                         Relation relations[],
                                         ReorderBufferChange *change);

这些参数类似于 change_cb 回调。但是,因为 TRUNCATE 对通过外键连接的表的操作需要一起执行,此回调接收关系数组而不是单个关系。有关详细信息,请参阅 TRUNCATE 语句的说明。

The parameters are analogous to the change_cb callback. However, because TRUNCATE actions on tables connected by foreign keys need to be executed together, this callback receives an array of relations instead of just a single one. See the description of the TRUNCATE statement for details.

49.6.4.7. Origin Filter Callback #

调用可选的 filter_by_origin_cb 回调以确定是否下列放的 origin_id 数据对输出插件感兴趣。

The optional filter_by_origin_cb callback is called to determine whether data that has been replayed from origin_id is of interest to the output plugin.

typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
                                               RepOriginId origin_id);

ctx 参数的内容与其他回调相同。除了源,没有其他信息可用。要表示传入节点上的原始更改无关紧要,请返回 true,导致它们被过滤掉;否则返回 false。其他回调不会被调用以获取已过滤的交易和更改。

The ctx parameter has the same contents as for the other callbacks. No information but the origin is available. To signal that changes originating on the passed in node are irrelevant, return true, causing them to be filtered away; false otherwise. The other callbacks will not be called for transactions and changes that have been filtered away.

在实现级联或多向复制解决方案时,这很有用。按来源过滤可以防止在这种设置中将相同的更改来回复制。虽然事务和更改也携带有关来源的信息,但通过此回调进行过滤明显更有效率。

This is useful when implementing cascading or multidirectional replication solutions. Filtering by the origin allows to prevent replicating the same changes back and forth in such setups. While transactions and changes also carry information about the origin, filtering via this callback is noticeably more efficient.

49.6.4.8. Generic Message Callback #

每当解码逻辑解码消息时,都会调用可选的 message_cb 回调。

The optional message_cb callback is called whenever a logical decoding message has been decoded.

typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr message_lsn,
                                        bool transactional,
                                        const char *prefix,
                                        Size message_size,
                                        const char *message);

txn 参数包含与事务相关的元信息,例如其提交的时间戳及其 XID。不过,如果消息是非事务性的,并且尚未在记录消息的事务中分配 XID,则它可以为 NULL。lsn 有消息的 WAL 位置。transactional 表示消息是否作为事务消息发送的。类似于更改回调,在解码已准备(但尚未提交)事务或解码未提交事务的情况下,由于此事务本身的同时回滚,此消息回调也会出错。在这种情况下,此已中止事务的逻辑解码会正常停止。prefix 是任意空终止的前缀,可用于识别当前插件的感兴趣消息。最后,message 参数包含 message_size 大小的实际消息。

The txn parameter contains meta information about the transaction, like the time stamp at which it has been committed and its XID. Note however that it can be NULL when the message is non-transactional and the XID was not assigned yet in the transaction which logged the message. The lsn has WAL location of the message. The transactional says if the message was sent as transactional or not. Similar to the change callback, in case of decoding a prepared (but yet uncommitted) transaction or decoding of an uncommitted transaction, this message callback might also error out due to simultaneous rollback of this very same transaction. In that case, the logical decoding of this aborted transaction is stopped gracefully. The prefix is arbitrary null-terminated prefix which can be used for identifying interesting messages for the current plugin. And finally the message parameter holds the actual message of message_size size.

应格外小心,以确保输出插件认为有趣的前缀是唯一的。经常使用扩展名或输出插件本省的名字会是个不错的选择。

Extra care should be taken to ensure that the prefix the output plugin considers interesting is unique. Using name of the extension or the output plugin itself is often a good choice.

49.6.4.9. Prepare Filter Callback #

当调用可选的 filter_prepare_cb 回调时,此准备阶段应考虑将包含在当前两阶段提交事务中的数据作为解码考虑,还是作为 COMMIT PREPARED 时刻的常规单阶段事务稍后再考虑。若要表示应跳过解码,请返回 true;否则,请返回 false。如果未定义回调,则假定为 false(即,没有筛选,所有使用两阶段提交的事务都将分成两阶段解码)。

The optional filter_prepare_cb callback is called to determine whether data that is part of the current two-phase commit transaction should be considered for decoding at this prepare stage or later as a regular one-phase transaction at COMMIT PREPARED time. To signal that decoding should be skipped, return true; false otherwise. When the callback is not defined, false is assumed (i.e. no filtering, all transactions using two-phase commit are decoded in two phases as well).

typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
                                              TransactionId xid,
                                              const char *gid);

ctx 参数的内容与其他回调相同。参数 xidgid 提供两种不同的方式来识别事务。COMMIT PREPARED 或后来的 ROLLBACK PREPARED 具有这两个标识符,为输出插件提供选择要使用哪一个。

The ctx parameter has the same contents as for the other callbacks. The parameters xid and gid provide two different ways to identify the transaction. The later COMMIT PREPARED or ROLLBACK PREPARED carries both identifiers, providing an output plugin the choice of what to use.

可以对每个事务调用回调多次以进行解码,并且针对给定的 xidgid 对,它每次必须提供相同的静态答案。

The callback may be invoked multiple times per transaction to decode and must provide the same static answer for a given pair of xid and gid every time it is called.

49.6.4.10. Transaction Begin Prepare Callback #

每当已解码已准备事务的开始时,就调用必需的 begin_prepare_cb 回调。可在此回调中使用 txn 参数中的 gid 字段来检查插件是否已收到此 PREPARE,此时它可以出错或跳过事务的其余更改。

The required begin_prepare_cb callback is called whenever the start of a prepared transaction has been decoded. The gid field, which is part of the txn parameter, can be used in this callback to check if the plugin has already received this PREPARE in which case it can either error out or skip the remaining changes of the transaction.

typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn);

49.6.4.11. Transaction Prepare Callback #

每当已解码准备两阶段提交的事务时,就调用必需的 prepare_cb 回调。如果有任何修改行,则所有这行修改的 change_cb 回调都会在之前被调用。可在此回调中使用 txn 参数中的 gid 字段。

The required prepare_cb callback is called whenever a transaction which is prepared for two-phase commit has been decoded. The change_cb callback for all modified rows will have been called before this, if there have been any modified rows. The gid field, which is part of the txn parameter, can be used in this callback.

typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr prepare_lsn);

49.6.4.12. Transaction Commit Prepared Callback #

每当已解码交易 COMMIT PREPARED 时,就调用必需的 commit_prepared_cb 回调。可在此回调中使用 txn 参数中的 gid 字段。

The required commit_prepared_cb callback is called whenever a transaction COMMIT PREPARED has been decoded. The gid field, which is part of the txn parameter, can be used in this callback.

typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               XLogRecPtr commit_lsn);

49.6.4.13. Transaction Rollback Prepared Callback #

每当已解码交易 ROLLBACK PREPARED 时,就调用必需的 rollback_prepared_cb 回调。可在此回调中使用 txn 参数中的 gid 字段。可以使用参数 prepare_end_lsnprepare_time 来检查插件是否已收到此 PREPARE TRANSACTION,此时它可以应用回滚,否则,它可以跳过回滚操作。仅有 gid 是不够的,因为下游节点可以使用具有相同标识符的准备事务。

The required rollback_prepared_cb callback is called whenever a transaction ROLLBACK PREPARED has been decoded. The gid field, which is part of the txn parameter, can be used in this callback. The parameters prepare_end_lsn and prepare_time can be used to check if the plugin has received this PREPARE TRANSACTION in which case it can apply the rollback, otherwise, it can skip the rollback operation. The gid alone is not sufficient because the downstream node can have a prepared transaction with same identifier.

typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
                                                 ReorderBufferTXN *txn,
                                                 XLogRecPtr prepare_end_lsn,
                                                 TimestampTz prepare_time);

49.6.4.14. Stream Start Callback #

从进行中的事务中打开流式更改块时,调用必需的 stream_start_cb 回调。

The required stream_start_cb callback is called when opening a block of streamed changes from an in-progress transaction.

typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn);

49.6.4.15. Stream Stop Callback #

从进行中的事务中关闭流式更改块时,调用必需的 stream_stop_cb 回调。

The required stream_stop_cb callback is called when closing a block of streamed changes from an in-progress transaction.

typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
                                           ReorderBufferTXN *txn);

49.6.4.16. Stream Abort Callback #

调用必需的 stream_abort_cb 回调以中止先前流式传输的事务。

The required stream_abort_cb callback is called to abort a previously streamed transaction.

typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn,
                                            XLogRecPtr abort_lsn);

49.6.4.17. Stream Prepare Callback #

作为两阶段提交的一部分,调用 stream_prepare_cb 回调来准备先前流式传输的事务。当输出插件同时支持大规模进行中事务的流式传输和两阶段提交时,需要此回调。

The stream_prepare_cb callback is called to prepare a previously streamed transaction as part of a two-phase commit. This callback is required when the output plugin supports both the streaming of large in-progress transactions and two-phase commits.

typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr prepare_lsn);

49.6.4.18. Stream Commit Callback #

调用必需的 stream_commit_cb 回调来提交先前流式传输的事务。

The required stream_commit_cb callback is called to commit a previously streamed transaction.

typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             XLogRecPtr commit_lsn);

49.6.4.19. Stream Change Callback #

在流式更改块中发送更改时调用必需的 stream_change_cb 回调(由 stream_start_cbstream_stop_cb 调用限定)。由于事务可以在稍后的时间点中止,并且我们不会对已中止的事务解码更改,因此不会显示实际的更改。

The required stream_change_cb callback is called when sending a change in a block of streamed changes (demarcated by stream_start_cb and stream_stop_cb calls). The actual changes are not displayed as the transaction can abort at a later point in time and we don’t decode changes for aborted transactions.

typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             Relation relation,
                                             ReorderBufferChange *change);

49.6.4.20. Stream Message Callback #

在流式更改块中发送通用消息时调用可选的 stream_message_cb 回调(由 stream_start_cbstream_stop_cb 调用限定)。由于事务可以在稍后的时间点中止,并且我们不会对已中止的事务解码更改,因此不会显示事务消息的消息内容。

The optional stream_message_cb callback is called when sending a generic message in a block of streamed changes (demarcated by stream_start_cb and stream_stop_cb calls). The message contents for transactional messages are not displayed as the transaction can abort at a later point in time and we don’t decode changes for aborted transactions.

typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr message_lsn,
                                              bool transactional,
                                              const char *prefix,
                                              Size message_size,
                                              const char *message);

49.6.4.21. Stream Truncate Callback #

在流式更改块中的 TRUNCATE 命令调用可选的 stream_truncate_cb 回调(由 stream_start_cbstream_stop_cb 调用限定)。

The optional stream_truncate_cb callback is called for a TRUNCATE command in a block of streamed changes (demarcated by stream_start_cb and stream_stop_cb calls).

typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               int nrelations,
                                               Relation relations[],
                                               ReorderBufferChange *change);

这些参数类似于 stream_change_cb 回调。但是,因为 TRUNCATE 对通过外键连接的表的操作需要一起执行,此回调接收关系数组而不是单个关系。有关详细信息,请参阅 TRUNCATE 语句的说明。

The parameters are analogous to the stream_change_cb callback. However, because TRUNCATE actions on tables connected by foreign keys need to be executed together, this callback receives an array of relations instead of just a single one. See the description of the TRUNCATE statement for details.

49.6.5. Functions for Producing Output #

为实际生成输出,输出插件可以在 begin_cbcommit_cbchange_cb 回调内部将数据写入 StringInfo 输出缓冲区中的 ctx→out。在写入输出缓冲区之前,必须调用 OutputPluginPrepareWrite(ctx, last_write),并且在完成向缓冲区的写入后,必须调用 OutputPluginWrite(ctx, last_write) 来执行写入。last_write 指示特定写入是否是回调的最后一次写入。

To actually produce output, output plugins can write data to the StringInfo output buffer in ctx→out when inside the begin_cb, commit_cb, or change_cb callbacks. Before writing to the output buffer, OutputPluginPrepareWrite(ctx, last_write) has to be called, and after finishing writing to the buffer, OutputPluginWrite(ctx, last_write) has to be called to perform the write. The last_write indicates whether a particular write was the callback’s last write.

以下示例演示了如何将数据输出到输出插件的使用者:

The following example shows how to output data to the consumer of an output plugin:

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);