Postgresql 中文操作指南

49.6. Logical Decoding Output Plugins #

可以在 PostgreSQL 源树的 contrib/test_decoding 子目录中找到一个示例输出插件。

49.6.1. Initialization Function #

输出插件通过动态加载一个以输出插件的名称作为库基本名称的共享库来加载。使用常规库搜索路径来找到库。为了提供所需的输出插件回调并指示库实际上是一个输出插件,它需要提供一个名为__PG_output_plugin_init_的函数。此函数传递一个结构,该结构需要使用回调函数指针填充各个操作。

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
    LogicalDecodeFilterPrepareCB filter_prepare_cb;
    LogicalDecodeBeginPrepareCB begin_prepare_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeCommitPreparedCB commit_prepared_cb;
    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
    LogicalDecodeStreamStartCB stream_start_cb;
    LogicalDecodeStreamStopCB stream_stop_cb;
    LogicalDecodeStreamAbortCB stream_abort_cb;
    LogicalDecodeStreamPrepareCB stream_prepare_cb;
    LogicalDecodeStreamCommitCB stream_commit_cb;
    LogicalDecodeStreamChangeCB stream_change_cb;
    LogicalDecodeStreamMessageCB stream_message_cb;
    LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

begin_cbchange_cb_和_commit_cb_回调是必需的,而_startup_cbtruncate_cbmessage_cb、_filter_by_origin_cb_和_shutdown_cb_是可选的。如果未设置_truncate_cb_但要对_TRUNCATE_解码,则将忽略该操作。

输出插件还可以定义函数以支持大规模正在进行的事务的流传输。stream_start_cbstream_stop_cbstream_abort_cb、_stream_commit_cb_和_stream_change_cb_是必需的,而_stream_message_cb_和_stream_truncate_cb_是可选的。如果输出插件还支持两阶段提交,则_stream_prepare_cb_也是必需的。

输出插件还可以定义函数以支持两阶段提交,该提交允许在_PREPARE TRANSACTION_处解码操作。begin_prepare_cbprepare_cb、_commit_prepared_cb_和_rollback_prepared_cb_回调是必需的,而_filter_prepare_cb_是可选的。如果输出插件还支持大规模正在进行的事务的流传输,则_stream_prepare_cb_也是必需的。

49.6.2. Capabilities #

要解码、格式化和输出更改,输出插件可以使用大部分后端的常规基础架构,包括调用输出函数。允许只读访问关系,只要只访问由_initdb_在_pg_catalog_模式中创建的关系,或使用标记为用户提供的目录表的函数进行标记即可

ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);

请注意,对输出插件中的用户目录表或常规系统目录表的访问必须仅通过_systable_*_ scan API完成。通过_heap_*_ scan API访问将出错。此外,禁止任何导致事务ID分配的操作。其中包括但不限于写入表、执行DDL更改和调用_pg_current_xact_id()_。

49.6.3. Output Modes #

输出插件的回调函数可以以几乎任意的格式向使用者传递数据。对于某些用例(比如通过 SQL 查看更改内容),以可以包含任意数据的类型(例如 bytea)返回数据非常麻烦。如果输出插件仅以服务器的编码方式输出文本数据,可以通过在 startup callback 中将 OutputPluginOptions.output_type 设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUT 而非 OUTPUT_PLUGIN_BINARY_OUTPUT 来声明这一点。在这种情况下,所有数据都必须以服务器的编码进行编码,以便 text 数据项可以对其进行包含。此设置将检查已启用的声明中的设置。

49.6.4. Output Plugin Callbacks #

输出插件会通过它需要提供的各种回调收到有关正在发生的更改的通知。

并发事务按提交顺序解码,并且仅在 begincommit 回调之间解码属于特定事务的更改。已显式或隐式回滚的事务永远不会被解码。成功的保存点以其在该事务中执行的顺序折叠到包含它们的交易中。使用 PREPARE TRANSACTION 准备双阶段提交的事务还将在提供解码所需的输出插件回调时进行解码。正在解码的当前准备事务可能通过 ROLLBACK PREPARED 命令同时中止。在这种情况下,此事务的逻辑解码也将中止。一旦检测到中止并调用 prepare_cb 回调,此类事务的所有更改都将被跳过。因此,即使在同时中止的情况下,也会向输出插件提供足够的信息,以便在解码后正确处理 ROLLBACK PREPARED

Note

只有已安全刷新到磁盘的事务才会被解码。当 synchronous_commit 设置为 off 时,在紧随其后的 pg_logical_slot_get_changes() 中,COMMIT 可能不会立即被解码。

49.6.4.1. Startup Callback #

无论准备输出的更改数量如何,只要创建复制槽或要求其流式传输更改,都会调用可选的 startup_cb 回调。

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);

当创建复制槽时,is_init 参数将为 true,否则将为 false。options 指向输出插件可以设置的选项结构:

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
    bool        receive_rewrites;
} OutputPluginOptions;

output_type 必须设置为 OUTPUT_PLUGIN_TEXTUAL_OUTPUTOUTPUT_PLUGIN_BINARY_OUTPUT。另请参见 Section 49.6.3。如果 receive_rewrites 为真,那么在某些 DDL 操作期间堆重写所做的更改也能调用输出插件。对于可处理 DDL 复制的插件而言,这种情况存在问题,但需要特殊处理。

启动回调应验证 ctx→output_plugin_options 中存在的选项。如果输出插件需要有状态,它可以使用 ctx→output_plugin_private 来存储它。

49.6.4.2. Shutdown Callback #

每当以前处于活动状态的复制槽不再使用时,将调用可选的 shutdown_cb 回调,并且可以使用该回调来释放输出插件私有的资源。槽不一定会被删除,流式传输只是停止了。

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

49.6.4.3. Transaction Begin Callback #

每当已解码提交事务的开始时,都需要调用 begin_cb 回调。已中止的事务及其内容永远不会被解码。

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
                                      ReorderBufferTXN *txn);

txn 参数包含有关事务的元信息,如已提交的时间戳和 XID。

49.6.4.4. Transaction End Callback #

每当已解码事务提交时,都需要调用 commit_cb 回调。如果没有已修改的行,则会在之前调用所有已修改行的 change_cb 回调。

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);

49.6.4.5. Change Callback #

对于事务中的每个单独行修改,都需要调用 change_cb 回调,它可能是 INSERTUPDATEDELETE。即使原始命令一次修改多行,也会对每行单独调用回调。change_cb 回调可以访问系统或用户目录表,以帮助输出行修改详细信息的过程。在解码已准备(但尚未提交)的事务或解码未提交的事务的情况下,此更改回调还可能由于此事务同时回滚而出错。在这种情况下,此中止事务的逻辑解码将正常停止。

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);

ctxtxn 参数的内容与 begin_cbcommit_cb 回调相同,但另外关系描述符 relation 指向行所属的关系,并且描述行修改的 struct change 已传入。

Note

只能提取未记录(请参阅 UNLOGGED )并且不是临时(请参阅 TEMPORARY or TEMP )的用户定义表中的更改,才能使用逻辑解码。

49.6.4.6. Truncate Callback #

可选的 truncate_cb 回调是被 TRUNCATE 命令调用的。

typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
                                         ReorderBufferTXN *txn,
                                         int nrelations,
                                         Relation relations[],
                                         ReorderBufferChange *change);

这些参数类似于 change_cb 回调。但是,因为 TRUNCATE 对通过外键连接的表的操作需要一起执行,此回调接收关系数组而不是单个关系。有关详细信息,请参阅 TRUNCATE 语句的说明。

49.6.4.7. Origin Filter Callback #

调用可选的 filter_by_origin_cb 回调以确定是否下列放的 origin_id 数据对输出插件感兴趣。

typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
                                               RepOriginId origin_id);

ctx 参数的内容与其他回调相同。除了源,没有其他信息可用。要表示传入节点上的原始更改无关紧要,请返回 true,导致它们被过滤掉;否则返回 false。其他回调不会被调用以获取已过滤的交易和更改。

在实现级联或多向复制解决方案时,这很有用。按来源过滤可以防止在这种设置中将相同的更改来回复制。虽然事务和更改也携带有关来源的信息,但通过此回调进行过滤明显更有效率。

49.6.4.8. Generic Message Callback #

每当解码逻辑解码消息时,都会调用可选的 message_cb 回调。

typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr message_lsn,
                                        bool transactional,
                                        const char *prefix,
                                        Size message_size,
                                        const char *message);

txn 参数包含与事务相关的元信息,例如其提交的时间戳及其 XID。不过,如果消息是非事务性的,并且尚未在记录消息的事务中分配 XID,则它可以为 NULL。lsn 有消息的 WAL 位置。transactional 表示消息是否作为事务消息发送的。类似于更改回调,在解码已准备(但尚未提交)事务或解码未提交事务的情况下,由于此事务本身的同时回滚,此消息回调也会出错。在这种情况下,此已中止事务的逻辑解码会正常停止。prefix 是任意空终止的前缀,可用于识别当前插件的感兴趣消息。最后,message 参数包含 message_size 大小的实际消息。

应格外小心,以确保输出插件认为有趣的前缀是唯一的。经常使用扩展名或输出插件本省的名字会是个不错的选择。

49.6.4.9. Prepare Filter Callback #

当调用可选的 filter_prepare_cb 回调时,此准备阶段应考虑将包含在当前两阶段提交事务中的数据作为解码考虑,还是作为 COMMIT PREPARED 时刻的常规单阶段事务稍后再考虑。若要表示应跳过解码,请返回 true;否则,请返回 false。如果未定义回调,则假定为 false(即,没有筛选,所有使用两阶段提交的事务都将分成两阶段解码)。

typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
                                              TransactionId xid,
                                              const char *gid);

ctx 参数的内容与其他回调相同。参数 xidgid 提供两种不同的方式来识别事务。COMMIT PREPARED 或后来的 ROLLBACK PREPARED 具有这两个标识符,为输出插件提供选择要使用哪一个。

可以对每个事务调用回调多次以进行解码,并且针对给定的 xidgid 对,它每次必须提供相同的静态答案。

49.6.4.10. Transaction Begin Prepare Callback #

每当已解码已准备事务的开始时,就调用必需的 begin_prepare_cb 回调。可在此回调中使用 txn 参数中的 gid 字段来检查插件是否已收到此 PREPARE,此时它可以出错或跳过事务的其余更改。

typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn);

49.6.4.11. Transaction Prepare Callback #

每当已解码准备两阶段提交的事务时,就调用必需的 prepare_cb 回调。如果有任何修改行,则所有这行修改的 change_cb 回调都会在之前被调用。可在此回调中使用 txn 参数中的 gid 字段。

typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr prepare_lsn);

49.6.4.12. Transaction Commit Prepared Callback #

每当已解码交易 COMMIT PREPARED 时,就调用必需的 commit_prepared_cb 回调。可在此回调中使用 txn 参数中的 gid 字段。

typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               XLogRecPtr commit_lsn);

49.6.4.13. Transaction Rollback Prepared Callback #

每当已解码交易 ROLLBACK PREPARED 时,就调用必需的 rollback_prepared_cb 回调。可在此回调中使用 txn 参数中的 gid 字段。可以使用参数 prepare_end_lsnprepare_time 来检查插件是否已收到此 PREPARE TRANSACTION,此时它可以应用回滚,否则,它可以跳过回滚操作。仅有 gid 是不够的,因为下游节点可以使用具有相同标识符的准备事务。

typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
                                                 ReorderBufferTXN *txn,
                                                 XLogRecPtr prepare_end_lsn,
                                                 TimestampTz prepare_time);

49.6.4.14. Stream Start Callback #

从进行中的事务中打开流式更改块时,调用必需的 stream_start_cb 回调。

typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn);

49.6.4.15. Stream Stop Callback #

从进行中的事务中关闭流式更改块时,调用必需的 stream_stop_cb 回调。

typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
                                           ReorderBufferTXN *txn);

49.6.4.16. Stream Abort Callback #

调用必需的 stream_abort_cb 回调以中止先前流式传输的事务。

typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn,
                                            XLogRecPtr abort_lsn);

49.6.4.17. Stream Prepare Callback #

作为两阶段提交的一部分,调用 stream_prepare_cb 回调来准备先前流式传输的事务。当输出插件同时支持大规模进行中事务的流式传输和两阶段提交时,需要此回调。

typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr prepare_lsn);

49.6.4.18. Stream Commit Callback #

调用必需的 stream_commit_cb 回调来提交先前流式传输的事务。

typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             XLogRecPtr commit_lsn);

49.6.4.19. Stream Change Callback #

在流式更改块中发送更改时调用必需的 stream_change_cb 回调(由 stream_start_cbstream_stop_cb 调用限定)。由于事务可以在稍后的时间点中止,并且我们不会对已中止的事务解码更改,因此不会显示实际的更改。

typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             Relation relation,
                                             ReorderBufferChange *change);

49.6.4.20. Stream Message Callback #

在流式更改块中发送通用消息时调用可选的 stream_message_cb 回调(由 stream_start_cbstream_stop_cb 调用限定)。由于事务可以在稍后的时间点中止,并且我们不会对已中止的事务解码更改,因此不会显示事务消息的消息内容。

typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr message_lsn,
                                              bool transactional,
                                              const char *prefix,
                                              Size message_size,
                                              const char *message);

49.6.4.21. Stream Truncate Callback #

在流式更改块中的 TRUNCATE 命令调用可选的 stream_truncate_cb 回调(由 stream_start_cbstream_stop_cb 调用限定)。

typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               int nrelations,
                                               Relation relations[],
                                               ReorderBufferChange *change);

这些参数类似于 stream_change_cb 回调。但是,因为 TRUNCATE 对通过外键连接的表的操作需要一起执行,此回调接收关系数组而不是单个关系。有关详细信息,请参阅 TRUNCATE 语句的说明。

49.6.5. Functions for Producing Output #

为实际生成输出,输出插件可以在 begin_cbcommit_cbchange_cb 回调内部将数据写入 StringInfo 输出缓冲区中的 ctx→out。在写入输出缓冲区之前,必须调用 OutputPluginPrepareWrite(ctx, last_write),并且在完成向缓冲区的写入后,必须调用 OutputPluginWrite(ctx, last_write) 来执行写入。last_write 指示特定写入是否是回调的最后一次写入。

以下示例演示了如何将数据输出到输出插件的使用者:

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);