Reactive Messaging AMQP 1.0 Connector Reference Documentation

本指南是 Getting Started with AMQP 1.0 的配套指南。它更详细地解释了 AMQP 连接器的配置和使用,用于响应式消息传递。

This guide is the companion from the Getting Started with AMQP 1.0. It explains in more details the configuration and usage of the AMQP connector for reactive messaging.

本文档并未涵盖连接器的所有详细信息。有关更多详细信息,请参阅 SmallRye Reactive Messaging website

This documentation does not cover all the details of the connector. Refer to the SmallRye Reactive Messaging website for further details.

AMQP 连接器允许 Quarkus 应用程序使用 AMQP 1.0 协议发送和接收消息。有关该协议的更多详细信息,请参阅 the AMQP 1.0 specification。请务必注意,AMQP 1.0 和 AMQP 0.9.1(RabbitMQ 实现)不兼容。查看 Using RabbitMQ 以了解详细信息。

The AMQP connector allows Quarkus applications to send and receive messages using the AMQP 1.0 protocol. More details about the protocol can be found in the AMQP 1.0 specification. It’s important to note that AMQP 1.0 and AMQP 0.9.1 (implemented by RabbitMQ) are incompatible. Check using-rabbitmq to get more details.

AMQP connector extension

若要使用连接器,你需要添加 quarkus-messaging-amqp 扩展。

To use the connector, you need to add the quarkus-messaging-amqp extension.

你可以使用以下方式将扩展添加到你的项目:

You can add the extension to your project using:

Unresolved directive in amqp-reference.adoc - include::{includes}/devtools/extension-add.adoc[]

或者只需将以下依赖项添加到你的项目中:

Or just add the following dependency to your project:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-amqp</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-amqp")

添加到您的项目后,您可以通过配置 connector 属性,将 channels 映射到 AMQP 地址:

Once added to your project, you can map channels to AMQP addresses by configuring the connector attribute:

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
Connector auto-attachment

如果类路径中只有一个连接器,则可以省略 connector 属性配置。Quarkus 自动将 orphan 通道与类路径上找到的(唯一的)连接器关联起来。Orphans 通道是无下游使用者的传出通道或无上游生产者的传入通道。

If you have a single connector on your classpath, you can omit the connector attribute configuration. Quarkus automatically associates orphan channels to the (unique) connector found on the classpath. Orphans channels are outgoing channels without a downstream consumer or incoming channels without an upstream producer.

可以使用以下方法禁用此自动附加功能:

This auto-attachment can be disabled using:

quarkus.messaging.auto-connector-attachment=false

Configuring the AMQP Broker access

AMQP 连接器连接到 AMQP 1.0 代理,例如 Apache ActiveMQ 或 Artemis。若要配置代理的位置和凭证,请在 application.properties 中添加以下属性:

The AMQP connector connects to AMQP 1.0 brokers such as Apache ActiveMQ or Artemis. To configure the location and credentials of the broker, add the following properties in the application.properties:

amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)

mp.messaging.incoming.prices.connector=smallrye-amqp (5)
1 Configures the broker/router host name. You can do it per channel (using the host attribute) or globally using amqp-host
2 Configures the broker/router port. You can do it per channel (using the port attribute) or globally using amqp-port. The default is 5672.
3 Configures the broker/router username if required. You can do it per channel (using the username attribute) or globally using amqp-username.
4 Configures the broker/router password if required. You can do it per channel (using the password attribute) or globally using amqp-password.
5 Instructs the prices channel to be managed by the AMQP connector

在开发模式和运行测试时, Dev Services for AMQP 自动启动 AMQP 代理。

In dev mode and when running tests, Dev Services for AMQP automatically starts an AMQP broker.

Receiving AMQP messages

假设你的应用程序接收了 Message<Double> 。你可以直接使用有效负载:

Let’s imagine your application receives Message<Double>. You can consume the payload directly:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class AmqpPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

或者,你可以检索 Message<Double>:

Or, you can retrieve the Message<Double>:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class AmqpPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message, marking the AMQP message as `accepted`.
        return price.ack();
    }

}

Inbound Metadata

来自 AMQP 的消息在元数据中包含一个 IncomingAmqpMetadata 实例。

Messages coming from AMQP contain an instance of IncomingAmqpMetadata in the metadata.

Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
    String address = meta.getAddress();
    String subject = meta.getSubject();
    boolean durable = meta.isDurable();
    // Use io.vertx.core.json.JsonObject
    JsonObject properties = meta.getProperties();
    // ...
});

Deserialization

连接器将传入的 AMQP 消息转换为 Reactive Messaging Message<T> 实例。 T 取决于接收的 AMQP 消息的 body

The connector converts incoming AMQP Messages into Reactive Messaging Message<T> instances. T depends on the body of the received AMQP Message.

AMQP Type System 定义了受支持的类型。

The AMQP Type System defines the supported types.

AMQP Body Type <T>

AMQP Value containing a AMQP Primitive Type

the corresponding Java type

AMQP Value using the Binary type

byte[]

AMQP Sequence

List

AMQP Data (with binary content) and the content-type is set to application/json

JsonObject

AMQP Data with a different content-type

byte[]

如果你使用此 AMQP 连接器(出站连接器)发送对象,该对象将被编码为 JSON 并作为二进制数据发送。content-type 设置为 application/json。因此,你可以按以下方式重建对象:

If you send objects with this AMQP connector (outbound connector), it gets encoded as JSON and sent as binary. The content-type is set to application/json. So, you can rebuild the object as follows:

import io.vertx.core.json.JsonObject;
//
@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-amqp") (1)
    public void consume(JsonObject p) {   (2)
        Price price = p.mapTo(Price.class);  (3)
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}
1 The Price instances are automatically encoded to JSON by the connector
2 You can receive it using a JsonObject
3 Then, you can reconstruct the instance using the mapTo method

mapTo 方法使用 Quarkus Jackson 映射器。请查看 this guide 了解有关映射器配置的详细信息。

The mapTo method uses the Quarkus Jackson mapper. Check this guide to learn more about the mapper configuration.

Acknowledgement

当与 AMQP 消息关联的 Reactive Messaging 消息得到确认时,它会通知代理此消息已 accepted

When a Reactive Messaging Message associated with an AMQP Message is acknowledged, it informs the broker that the message has been accepted.

Failure Management

如果来自 AMQP 消息的消息 nacked,则会应用故障策略。AMQP 连接器支持六种策略:

If a message produced from an AMQP message is nacked, a failure strategy is applied. The AMQP connector supports six strategies:

  • fail - fail the application; no more AMQP messages will be processed (default). The AMQP message is marked as rejected.

  • accept - this strategy marks the AMQP message as accepted. The processing continues ignoring the failure. Refer to the accepted delivery state documentation.

  • release - this strategy marks the AMQP message as released. The processing continues with the next message. The broker can redeliver the message. Refer to the released delivery state documentation.

  • reject - this strategy marks the AMQP message as rejected. The processing continues with the next message. Refer to the rejected delivery state documentation.

  • modified-failed - this strategy marks the AMQP message as modified and indicates that it failed (with the delivery-failed attribute). The processing continues with the next message, but the broker may attempt to redeliver the message. Refer to the modified delivery state documentation

  • modified-failed-undeliverable-here - this strategy marks the AMQP message as modified and indicates that it failed (with the delivery-failed attribute). It also indicates that the application cannot process the message, meaning that the broker will not attempt to redeliver the message to this node. The processing continues with the next message. Refer to the modified delivery state documentation

Sending AMQP messages

Serialization

发送 Message<T> 时,连接器会将该消息转换为 AMQP 消息。负载被转换为 AMQP 消息 body

When sending a Message<T>, the connector converts the message into an AMQP Message. The payload is converted to the AMQP Message body.

T AMQP Message Body

primitive types or String

AMQP Value with the payload

Instant or UUID

AMQP Value using the corresponding AMQP Type

JsonObject or JsonArray

AMQP Data using a binary content. The content-type is set to application/json

io.vertx.mutiny.core.buffer.Buffer

AMQP Data using a binary content. No content-type set

Any other class

The payload is converted to JSON (using a Json Mapper). The result is wrapped into AMQP Data using a binary content. The content-type is set to application/json

如果无法将消息有效内容序列化为 JSON,则中断消息 nacked

If the message payload cannot be serialized to JSON, the message is nacked.

Outbound Metadata

发送 Messages 时,可以添加 OutgoingAmqpMetadata 实例以影响将消息发送到 AMQP的方式。例如,您可以配置主题和属性:

When sending Messages, you can add an instance of OutgoingAmqpMetadata to influence how the message is going to be sent to AMQP. For example, you can configure the subjects, properties:

 OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withDurable(true)
    .withSubject("my-subject")
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

Dynamic address names

有时,按需选择消息的目标是可取的。在这种情况下,不应在应用配置文档中配置地址,而应改用发送元数据来设置地址。

Sometimes it is desirable to select the destination of a message dynamically. In this case, you should not configure the address inside your application configuration file, but instead, use the outbound metadata to set the address.

例如,可以根据传入消息发送到动态地址:

For example, you can send to a dynamic address based on the incoming message:

String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withAddress(addressName)
    .withDurable(true)
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

为了能够根据每条消息设置地址,连接器正在使用 anonymous sender

To be able to set the address per message, the connector is using an anonymous sender.

Acknowledgement

默认情况下,Reactive Messaging Message 在代理确认消息后确认。使用路由器时,可能无法启用此确认。在这种情况下,配置 auto-acknowledgement 属性以在将消息发送到路由器后立即确认消息。

By default, the Reactive Messaging Message is acknowledged when the broker acknowledged the message. When using routers, this acknowledgement may not be enabled. In this case, configure the auto-acknowledgement attribute to acknowledge the message as soon as it has been sent to the router.

如果 AMQP 消息被代理拒绝/释放/修改(或无法成功发送),则消息会被拒绝。

If an AMQP message is rejected/released/modified by the broker (or cannot be sent successfully), the message is nacked.

Back Pressure and Credits

反压由 AMQP credits 处理。发送连接器仅请求允许授信的量。当授信量达到 0 时,它会一直等待(以非阻塞方式),直到代理向 AMQP 发送方授予更多授信。

The back-pressure is handled by AMQP credits. The outbound connector only requests the amount of allowed credits. When the amount of credits reaches 0, it waits (in a non-blocking fashion) until the broker grants more credits to the AMQP sender.

Configuring the AMQP address

可以使用 address 属性配置 AMQP 地址:

You can configure the AMQP address using the address attribute:

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=my-queue

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.outgoing.orders.address=my-order-queue

如果未设置 address 属性,则连接器将使用通道名称。

If the address attribute is not set, the connector uses the channel name.

为了使用现有队列,需要配置 addresscontainer-id 以及(可选)link-name 属性。例如,如果配置了 Apache Artemis 代理:

To use an existing queue, you need to configure the address, container-id and, optionally, the link-name attributes. For example, if you have an Apache Artemis broker configured with:

<queues>
    <queue name="people">
        <address>people</address>
        <durable>true</durable>
        <user>artemis</user>
    </queue>
</queues>

您需要以下配置:

You need the following configuration:

mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people

如果队列名称不是通道名称,可能需要配置 link-name 属性:

You may need to configure the link-name attribute, if the queue name is not the channel name:

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people

为了使用 MULTICAST 队列,需要提供 FQQN(完全限定的队列名称),而不仅仅是队列名称:

To use a MULTICAST queue, you need to provide the FQQN (fully-qualified queue name) instead of just the name of the queue:

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo

mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people

有关 AMQP 地址模型的更多详细信息,请参见 Artemis documentation

More details about the AMQP Address model can be found in the Artemis documentation.

Execution model and Blocking processing

响应式消息传递在 I/O 线程中调用你的方法。有关此主题的更多详细信息,请查看 Quarkus Reactive Architecture documentation。但是,你通常需要将响应式消息传递与数据库交互等阻塞式处理结合使用。为此,你需要使用 @Blocking 注解,指示处理为 blocking 且不应该在调用者线程上运行。

Reactive Messaging invokes your method on an I/O thread. See the Quarkus Reactive Architecture documentation for further details on this topic. But, you often need to combine Reactive Messaging with blocking processing such as database interactions. For this, you need to use the @Blocking annotation indicating that the processing is blocking and should not be run on the caller thread.

例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入有效负载存储到数据库:

For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

有 2 个 @Blocking 注释:

There are 2 @Blocking annotations:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们具有相同的效果。因此,可以同时使用。第一个提供更精细的调整,例如要使用的工作器池以及是否保留顺序。第二个也与 Quarkus 的其他响应式特性结合使用,使用默认工作器池并保留顺序。

They have the same effect. Thus, you can use both. The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.

@RunOnVirtualThread

有关在 Java {@s7} 上运行阻塞处理,请参阅 {@s8}。

For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation.

@Transactional

如果用 {@s9} 对您的方法进行了注解,则它将自动被视为`{@s11}`,即使没有用 {@s10} 对方法进行注解。

If your method is annotated with @Transactional, it will be considered blocking automatically, even if the method is not annotated with @Blocking.

Customizing the underlying AMQP client

连接器在底层使用 Vert.x AMQP 客户端。有关此客户端的更多详细信息,请参见 Vert.x website

The connector uses the Vert.x AMQP client underneath. More details about this client can be found in the Vert.x website.

可以通过生成一个 AmqpClientOptions 实例来定制底层客户端配置,如下所示:

You can customize the underlying client configuration by producing an instance of AmqpClientOptions as follows:

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // You can use the produced options to configure the TLS connection
  PemKeyCertOptions keycert = new PemKeyCertOptions()
    .addCertPath("./tls/tls.crt")
    .addKeyPath("./tls/tls.key");
  PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
  return new AmqpClientOptions()
        .setSsl(true)
        .setPemKeyCertOptions(keycert)
        .setPemTrustOptions(trust)
        .addEnabledSaslMechanism("EXTERNAL")
        .setHostnameVerificationAlgorithm("") // Disables the hostname verification. Defaults is "HTTPS"
        .setConnectTimeout(30000)
        .setReconnectInterval(5000)
        .setContainerId("my-container");
}

此实例会被检索并用于配置连接器所用的客户端。您需要使用 client-options-name 属性指示客户端的名称:

This instance is retrieved and used to configure the client used by the connector. You need to indicate the name of the client using the client-options-name attribute:

mp.messaging.incoming.prices.client-options-name=my-named-options

如果您经常与代理断开连接,也可以使用 AmqpClientOptions 在需要永久保持 AMQP 连接时设置心跳。某些代理可能在一定空闲超时后终止 AMQP 连接。您可以提供一个心跳值,Vert.x Proton 客户端将在向远程对等方打开传输内容时使用该值来通告空闲超时。

If you experience frequent disconnections from the broker, the AmqpClientOptions can also be used to set a heartbeat if you need to keep the AMQP connection permanently. Some brokers might terminate the AMQP connection after a certain idle timeout. You can provide a heartbeat value which will be used by the Vert.x proton client to advertise the idle timeout when opening transport to a remote peer.

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // set a heartbeat of 30s (in milliseconds)
  return new AmqpClientOptions()
        .setHeartbeat(30000);
}

Health reporting

如果您对带有 quarkus-smallrye-health 扩展的 AMQP 连接器使用该扩展,那么它将有助于就绪性和健康检查。AMQP 连接器会报告连接器所管理的每个通道的就绪性和健康状况。目前,AMQP 连接器对就绪性和健康检查使用相同的逻辑。

If you use the AMQP connector with the quarkus-smallrye-health extension, it contributes to the readiness and liveness probes. The AMQP connector reports the readiness and liveness of each channel managed by the connector. At the moment, the AMQP connector uses the same logic for the readiness and liveness checks.

要禁用运行状况报告,请将通道的 health-enabled 属性设置为 false。在入站端(从 AMQP 接收消息),检查将验证接收器是否已附加到代理。在出站端(向 AMQP 发送记录),检查将验证发送器是否已附加到代理。

To disable health reporting, set the health-enabled attribute for the channel to false. On the inbound side (receiving messages from AMQP), the check verifies that the receiver is attached to the broker. On the outbound side (sending records to AMQP), the check verifies that the sender is attached to the broker.

请注意,消息处理失败会对消息发出否定确认,然后由 failure-strategy 处理该消息。failure-strategy 负责报告失败并影响检查结果。fail 失败策略会报告失败,因此检查会报告故障。

Note that a message processing failures nacks the message, which is then handled by the failure-strategy. It is the responsibility of the failure-strategy to report the failure and influence the outcome of the checks. The fail failure strategy reports the failure, and so the check will report the fault.

Using RabbitMQ

此连接器适用于 AMQP 1.0。RabbitMQ 实现 AMQP 0.9.1。RabbitMQ 默认不提供 AMQP 1.0,但有一个可用于该版本的插件。要将 RabbitMQ 与此连接器配合使用,请启用并配置 AMQP 1.0 插件。

This connector is for AMQP 1.0. RabbitMQ implements AMQP 0.9.1. RabbitMQ does not provide AMQP 1.0 by default, but there is a plugin for it. To use RabbitMQ with this connector, enable and configure the AMQP 1.0 plugin.

尽管存在该插件,但一些 AMQP 1.0 功能无法与 RabbitMQ 一起使用。因此,我们推荐以下配置。

Despite the existence of the plugin, a few AMQP 1.0 features won’t work with RabbitMQ. Thus, we recommend the following configurations.

从 RabbitMQ 接收消息:

To receive messages from RabbitMQ:

  • Set durable to false

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false

向 RabbitMQ 发送消息:

To send messages to RabbitMQ:

  • set the destination address (anonymous sender are not supported)

  • set use-anonymous-sender to false

mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false

因此,在使用 RabbitMQ 时无法动态更改目标(使用消息元数据)。

As a consequence, it’s not possible to change the destination dynamically (using message metadata) when using RabbitMQ.

Receiving Cloud Events

AMQP 连接器支持 Cloud Events。当连接器检测到 structuredbinary Cloud 事件时,它会在 Message 的元数据中添加一个 IncomingCloudEventMetadata<T>.IncomingCloudEventMetadata 包含对强制和可选 Cloud 事件属性的访问器。

The AMQP connector supports Cloud Events. When the connector detects a structured or binary Cloud Events, it adds a IncomingCloudEventMetadata<T> into the metadata of the Message. IncomingCloudEventMetadata contains accessors to the mandatory and optional Cloud Event attributes.

如果连接器无法提取 Cloud 事件元数据,它将在没有元数据的情况下发送消息。

If the connector cannot extract the Cloud Event metadata, it sends the Message without the metadata.

有关接收 Cloud 事件的更多信息,请参见 SmallRye Reactive Messaging 文档中的 Receiving Cloud Events

For more information on receiving Cloud Events, see Receiving Cloud Events in SmallRye Reactive Messaging documentation.

Sending Cloud Events

AMQP 连接器支持 Cloud Events。如果满足以下条件,连接器将以 Cloud 事件的形式发送出站记录:

The AMQP connector supports Cloud Events. The connector sends the outbound record as Cloud Events if:

  • the message metadata contains an io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata instance,

  • the channel configuration defines the cloud-events-type and cloud-events-source attributes.

有关发送 Cloud 事件的更多信息,请参见 SmallRye Reactive Messaging 文档中的 Sending Cloud Events

For more information on sending Cloud Events, see Sending Cloud Events in SmallRye Reactive Messaging documentation.

AMQP Connector Configuration Reference

Quarkus specific configuration

Unresolved directive in amqp-reference.adoc - include::{generated-dir}/config/quarkus-messaging-amqp.adoc[]

Incoming channel configuration

Attribute (alias) Description Mandatory Default

[role="no-hyphens"]address

The AMQP address. If not set, the channel name is used

Type: string

false

[role="no-hyphens"]auto-acknowledgement

Whether the received AMQP messages must be acknowledged when received

Type: boolean

false

false

[role="no-hyphens"]broadcast

Whether the received AMQP messages must be dispatched to multiple subscribers

Type: boolean

false

false

[role="no-hyphens"]capabilities

A comma-separated list of capabilities proposed by the sender or receiver client.

Type: string

false

[role="no-hyphens"]client-options-name

[role="no-hyphens"](amqp-client-options-name)

The name of the AMQP Client Option bean used to customize the AMQP client configuration

Type: string

false

[role="no-hyphens"]cloud-events

Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.

Type: boolean

false

true

[role="no-hyphens"]connect-timeout

[role="no-hyphens"](amqp-connect-timeout)

The connection timeout in milliseconds

Type: int

false

1000

[role="no-hyphens"]container-id

The AMQP container id

Type: string

false

[role="no-hyphens"]durable

Whether AMQP subscription is durable

Type: boolean

false

false

[role="no-hyphens"]failure-strategy

Specify the failure strategy to apply when a message produced from an AMQP message is nacked. Accepted values are fail (default), accept, release, reject, modified-failed, modified-failed-undeliverable-here

Type: string

false

fail

[role="no-hyphens"]health-timeout

The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed.

Type: int

false

3

[role="no-hyphens"]host

[role="no-hyphens"](amqp-host)

The broker hostname

Type: string

false

localhost

[role="no-hyphens"]link-name

The name of the link. If not set, the channel name is used.

Type: string

false

[role="no-hyphens"]password

[role="no-hyphens"](amqp-password)

The password used to authenticate to the broker

Type: string

false

[role="no-hyphens"]port

[role="no-hyphens"](amqp-port)

The broker port

Type: int

false

5672

[role="no-hyphens"]reconnect-attempts

[role="no-hyphens"](amqp-reconnect-attempts)

The number of reconnection attempts

Type: int

false

100

[role="no-hyphens"]reconnect-interval

[role="no-hyphens"](amqp-reconnect-interval)

The interval in second between two reconnection attempts

Type: int

false

10

[role="no-hyphens"]sni-server-name

[role="no-hyphens"](amqp-sni-server-name)

If set, explicitly override the hostname to use for the TLS SNI server name

Type: string

false

[role="no-hyphens"]selector

Sets a message selector. This attribute is used to define an apache.org:selector-filter:string filter on the source terminus, using SQL-based syntax to request the server filters which messages are delivered to the receiver (if supported by the server in question). Precise functionality supported and syntax needed can vary depending on the server.

Type: string

false

[role="no-hyphens"]tracing-enabled

Whether tracing is enabled (default) or disabled

Type: boolean

false

true

[role="no-hyphens"]use-ssl

[role="no-hyphens"](amqp-use-ssl)

Whether the AMQP connection uses SSL/TLS

Type: boolean

false

false

[role="no-hyphens"]username

[role="no-hyphens"](amqp-username)

The username used to authenticate to the broker

Type: string

false

[role="no-hyphens"]virtual-host

[role="no-hyphens"](amqp-virtual-host)

If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use)

Type: string

false

Outgoing channel configuration

Attribute (alias) Description Mandatory Default

[role="no-hyphens"]address

The AMQP address. If not set, the channel name is used

Type: string

false

[role="no-hyphens"]capabilities

A comma-separated list of capabilities proposed by the sender or receiver client.

Type: string

false

[role="no-hyphens"]client-options-name

[role="no-hyphens"](amqp-client-options-name)

The name of the AMQP Client Option bean used to customize the AMQP client configuration

Type: string

false

[role="no-hyphens"]cloud-events

Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.

Type: boolean

false

true

[role="no-hyphens"]cloud-events-data-content-type

[role="no-hyphens"](cloud-events-default-data-content-type)

Configure the default datacontenttype attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the datacontenttype attribute itself

Type: string

false

[role="no-hyphens"]cloud-events-data-schema

[role="no-hyphens"](cloud-events-default-data-schema)

Configure the default dataschema attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the dataschema attribute itself

Type: string

false

[role="no-hyphens"]cloud-events-insert-timestamp

[role="no-hyphens"](cloud-events-default-timestamp)

Whether the connector should insert automatically the time attribute into the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the time attribute itself

Type: boolean

false

true

[role="no-hyphens"]cloud-events-mode

The Cloud Event mode (structured or binary (default)). Indicates how are written the cloud events in the outgoing record

Type: string

false

binary

[role="no-hyphens"]cloud-events-source

[role="no-hyphens"](cloud-events-default-source)

Configure the default source attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the source attribute itself

Type: string

false

[role="no-hyphens"]cloud-events-subject

[role="no-hyphens"](cloud-events-default-subject)

Configure the default subject attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the subject attribute itself

Type: string

false

[role="no-hyphens"]cloud-events-type

[role="no-hyphens"](cloud-events-default-type)

Configure the default type attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the type attribute itself

Type: string

false

[role="no-hyphens"]connect-timeout

[role="no-hyphens"](amqp-connect-timeout)

The connection timeout in milliseconds

Type: int

false

1000

[role="no-hyphens"]container-id

The AMQP container id

Type: string

false

[role="no-hyphens"]credit-retrieval-period

The period (in milliseconds) between two attempts to retrieve the credits granted by the broker. This time is used when the sender run out of credits.

Type: int

false

2000

[role="no-hyphens"]durable

Whether sent AMQP messages are marked durable

Type: boolean

false

false

[role="no-hyphens"]health-timeout

The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed.

Type: int

false

3

[role="no-hyphens"]host

[role="no-hyphens"](amqp-host)

The broker hostname

Type: string

false

localhost

[role="no-hyphens"]link-name

The name of the link. If not set, the channel name is used.

Type: string

false

[role="no-hyphens"]merge

Whether the connector should allow multiple upstreams

Type: boolean

false

false

[role="no-hyphens"]password

[role="no-hyphens"](amqp-password)

The password used to authenticate to the broker

Type: string

false

[role="no-hyphens"]port

[role="no-hyphens"](amqp-port)

The broker port

Type: int

false

5672

[role="no-hyphens"]reconnect-attempts

[role="no-hyphens"](amqp-reconnect-attempts)

The number of reconnection attempts

Type: int

false

100

[role="no-hyphens"]reconnect-interval

[role="no-hyphens"](amqp-reconnect-interval)

The interval in second between two reconnection attempts

Type: int

false

10

[role="no-hyphens"]sni-server-name

[role="no-hyphens"](amqp-sni-server-name)

If set, explicitly override the hostname to use for the TLS SNI server name

Type: string

false

[role="no-hyphens"]tracing-enabled

Whether tracing is enabled (default) or disabled

Type: boolean

false

true

[role="no-hyphens"]ttl

The time-to-live of the sent AMQP messages. 0 to disable the TTL

Type: long

false

0

[role="no-hyphens"]use-anonymous-sender

Whether the connector should use an anonymous sender. Default value is true if the broker supports it, false otherwise. If not supported, it is not possible to dynamically change the destination address.

Type: boolean

false

[role="no-hyphens"]use-ssl

[role="no-hyphens"](amqp-use-ssl)

Whether the AMQP connection uses SSL/TLS

Type: boolean

false

false

[role="no-hyphens"]username

[role="no-hyphens"](amqp-username)

The username used to authenticate to the broker

Type: string

false

[role="no-hyphens"]virtual-host

[role="no-hyphens"](amqp-virtual-host)

If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use)

Type: string

false

Conditionally configure channels

您可以使用特定配置文件配置通道。因此,仅当启用了指定配置文件时才配置通道(并将它们添加到应用程序中)。

You can configure the channels using a specific profile. Thus, the channels are only configured (and added to the application) when the specified profile is enabled.

要实现此目的,您需要做以下操作:

To achieve this, you need:

  1. Prefix the mp.messaging.[incoming|outgoing].$channel entries with %my-profile such as %my-profile.mp.messaging.[incoming|outgoing].$channel.key=value

  2. Use the @IfBuildProfile("my-profile") on the CDI beans containing @Incoming(channel) and @Outgoing(channel) annotations that need only to be enabled when the profile is enabled.

请注意,响应性消息传递会验证图形是否完整。因此,在使用这样的条件配置时,请确保应用程序可与已启用和未启用的配置文件配合使用。

Note that reactive messaging verifies that the graph is complete. So, when using such a conditional configuration, ensure the application works with and without the profile enabled.

请注意,此方法还可以根据配置文件来更改通道配置。

Note that this approach can also be used to change the channel configuration based on a profile.