Reactive Messaging AMQP 1.0 Connector Reference Documentation
本指南是 Getting Started with AMQP 1.0 的配套指南。它更详细地解释了 AMQP 连接器的配置和使用,用于响应式消息传递。
This guide is the companion from the Getting Started with AMQP 1.0. It explains in more details the configuration and usage of the AMQP connector for reactive messaging.
本文档并未涵盖连接器的所有详细信息。有关更多详细信息,请参阅 SmallRye Reactive Messaging website。 |
This documentation does not cover all the details of the connector. Refer to the SmallRye Reactive Messaging website for further details. |
AMQP 连接器允许 Quarkus 应用程序使用 AMQP 1.0 协议发送和接收消息。有关该协议的更多详细信息,请参阅 the AMQP 1.0 specification。请务必注意,AMQP 1.0 和 AMQP 0.9.1(RabbitMQ 实现)不兼容。查看 Using RabbitMQ 以了解详细信息。
The AMQP connector allows Quarkus applications to send and receive messages using the AMQP 1.0 protocol. More details about the protocol can be found in the AMQP 1.0 specification. It’s important to note that AMQP 1.0 and AMQP 0.9.1 (implemented by RabbitMQ) are incompatible. Check using-rabbitmq to get more details.
AMQP connector extension
若要使用连接器,你需要添加 quarkus-messaging-amqp
扩展。
To use the connector, you need to add the quarkus-messaging-amqp
extension.
你可以使用以下方式将扩展添加到你的项目:
You can add the extension to your project using:
Unresolved directive in amqp-reference.adoc - include::{includes}/devtools/extension-add.adoc[]
或者只需将以下依赖项添加到你的项目中:
Or just add the following dependency to your project:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-amqp</artifactId>
</dependency>
implementation("io.quarkus:quarkus-messaging-amqp")
添加到您的项目后,您可以通过配置 connector
属性,将 channels 映射到 AMQP 地址:
Once added to your project, you can map channels to AMQP addresses by configuring the connector
attribute:
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp
# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
Connector auto-attachment
如果类路径中只有一个连接器,则可以省略 If you have a single connector on your classpath, you can omit the 可以使用以下方法禁用此自动附加功能: This auto-attachment can be disabled using:
|
Configuring the AMQP Broker access
AMQP 连接器连接到 AMQP 1.0 代理,例如 Apache ActiveMQ 或 Artemis。若要配置代理的位置和凭证,请在 application.properties
中添加以下属性:
The AMQP connector connects to AMQP 1.0 brokers such as Apache ActiveMQ or Artemis.
To configure the location and credentials of the broker, add the following properties in the application.properties
:
amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)
mp.messaging.incoming.prices.connector=smallrye-amqp (5)
1 | Configures the broker/router host name. You can do it per channel (using the host attribute) or globally using amqp-host |
2 | Configures the broker/router port. You can do it per channel (using the port attribute) or globally using amqp-port . The default is 5672 . |
3 | Configures the broker/router username if required. You can do it per channel (using the username attribute) or globally using amqp-username . |
4 | Configures the broker/router password if required. You can do it per channel (using the password attribute) or globally using amqp-password . |
5 | Instructs the prices channel to be managed by the AMQP connector |
在开发模式和运行测试时, Dev Services for AMQP 自动启动 AMQP 代理。
In dev mode and when running tests, Dev Services for AMQP automatically starts an AMQP broker.
Receiving AMQP messages
假设你的应用程序接收了 Message<Double>
。你可以直接使用有效负载:
Let’s imagine your application receives Message<Double>
.
You can consume the payload directly:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class AmqpPriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
或者,你可以检索 Message<Double>:
Or, you can retrieve the Message<Double>:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class AmqpPriceMessageConsumer {
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> price) {
// process your price.
// Acknowledge the incoming message, marking the AMQP message as `accepted`.
return price.ack();
}
}
Inbound Metadata
来自 AMQP 的消息在元数据中包含一个 IncomingAmqpMetadata
实例。
Messages coming from AMQP contain an instance of IncomingAmqpMetadata
in the metadata.
Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
String address = meta.getAddress();
String subject = meta.getSubject();
boolean durable = meta.isDurable();
// Use io.vertx.core.json.JsonObject
JsonObject properties = meta.getProperties();
// ...
});
Deserialization
连接器将传入的 AMQP 消息转换为 Reactive Messaging Message<T>
实例。 T
取决于接收的 AMQP 消息的 body 。
The connector converts incoming AMQP Messages into Reactive Messaging Message<T>
instances.
T
depends on the body of the received AMQP Message.
AMQP Type System 定义了受支持的类型。
The AMQP Type System defines the supported types.
AMQP Body Type | <T> |
---|---|
AMQP Value containing a AMQP Primitive Type |
the corresponding Java type |
AMQP Value using the |
|
AMQP Sequence |
|
AMQP Data (with binary content) and the |
|
AMQP Data with a different |
|
如果你使用此 AMQP 连接器(出站连接器)发送对象,该对象将被编码为 JSON 并作为二进制数据发送。content-type
设置为 application/json
。因此,你可以按以下方式重建对象:
If you send objects with this AMQP connector (outbound connector), it gets encoded as JSON and sent as binary.
The content-type
is set to application/json
.
So, you can rebuild the object as follows:
import io.vertx.core.json.JsonObject;
//
@ApplicationScoped
public static class Consumer {
List<Price> prices = new CopyOnWriteArrayList<>();
@Incoming("from-amqp") (1)
public void consume(JsonObject p) { (2)
Price price = p.mapTo(Price.class); (3)
prices.add(price);
}
public List<Price> list() {
return prices;
}
}
1 | The Price instances are automatically encoded to JSON by the connector |
2 | You can receive it using a JsonObject |
3 | Then, you can reconstruct the instance using the mapTo method |
|
The |
Acknowledgement
当与 AMQP 消息关联的 Reactive Messaging 消息得到确认时,它会通知代理此消息已 accepted。
When a Reactive Messaging Message associated with an AMQP Message is acknowledged, it informs the broker that the message has been accepted.
Failure Management
如果来自 AMQP 消息的消息 nacked,则会应用故障策略。AMQP 连接器支持六种策略:
If a message produced from an AMQP message is nacked, a failure strategy is applied. The AMQP connector supports six strategies:
-
fail
- fail the application; no more AMQP messages will be processed (default). The AMQP message is marked as rejected. -
accept
- this strategy marks the AMQP message as accepted. The processing continues ignoring the failure. Refer to the accepted delivery state documentation. -
release
- this strategy marks the AMQP message as released. The processing continues with the next message. The broker can redeliver the message. Refer to the released delivery state documentation. -
reject
- this strategy marks the AMQP message as rejected. The processing continues with the next message. Refer to the rejected delivery state documentation. -
modified-failed
- this strategy marks the AMQP message as modified and indicates that it failed (with thedelivery-failed
attribute). The processing continues with the next message, but the broker may attempt to redeliver the message. Refer to the modified delivery state documentation -
modified-failed-undeliverable-here
- this strategy marks the AMQP message as modified and indicates that it failed (with thedelivery-failed
attribute). It also indicates that the application cannot process the message, meaning that the broker will not attempt to redeliver the message to this node. The processing continues with the next message. Refer to the modified delivery state documentation
Sending AMQP messages
Serialization
发送 Message<T>
时,连接器会将该消息转换为 AMQP 消息。负载被转换为 AMQP 消息 body。
When sending a Message<T>
, the connector converts the message into an AMQP Message.
The payload is converted to the AMQP Message body.
T |
AMQP Message Body |
---|---|
primitive types or |
AMQP Value with the payload |
|
AMQP Value using the corresponding AMQP Type |
AMQP Data using a binary content. The |
|
|
AMQP Data using a binary content. No |
Any other class |
The payload is converted to JSON (using a Json Mapper). The result is wrapped into AMQP Data using a binary content. The |
如果无法将消息有效内容序列化为 JSON,则中断消息 nacked。
If the message payload cannot be serialized to JSON, the message is nacked.
Outbound Metadata
发送 Messages
时,可以添加 OutgoingAmqpMetadata
实例以影响将消息发送到 AMQP的方式。例如,您可以配置主题和属性:
When sending Messages
, you can add an instance of OutgoingAmqpMetadata
to influence how the message is going to be sent to AMQP.
For example, you can configure the subjects, properties:
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
.withDurable(true)
.withSubject("my-subject")
.build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
Dynamic address names
有时,按需选择消息的目标是可取的。在这种情况下,不应在应用配置文档中配置地址,而应改用发送元数据来设置地址。
Sometimes it is desirable to select the destination of a message dynamically. In this case, you should not configure the address inside your application configuration file, but instead, use the outbound metadata to set the address.
例如,可以根据传入消息发送到动态地址:
For example, you can send to a dynamic address based on the incoming message:
String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
.withAddress(addressName)
.withDurable(true)
.build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
为了能够根据每条消息设置地址,连接器正在使用 anonymous sender。 |
To be able to set the address per message, the connector is using an anonymous sender. |
Acknowledgement
默认情况下,Reactive Messaging Message
在代理确认消息后确认。使用路由器时,可能无法启用此确认。在这种情况下,配置 auto-acknowledgement
属性以在将消息发送到路由器后立即确认消息。
By default, the Reactive Messaging Message
is acknowledged when the broker acknowledged the message.
When using routers, this acknowledgement may not be enabled.
In this case, configure the auto-acknowledgement
attribute to acknowledge the message as soon as it has been sent to the router.
如果 AMQP 消息被代理拒绝/释放/修改(或无法成功发送),则消息会被拒绝。
If an AMQP message is rejected/released/modified by the broker (or cannot be sent successfully), the message is nacked.
Back Pressure and Credits
反压由 AMQP credits 处理。发送连接器仅请求允许授信的量。当授信量达到 0 时,它会一直等待(以非阻塞方式),直到代理向 AMQP 发送方授予更多授信。
The back-pressure is handled by AMQP credits. The outbound connector only requests the amount of allowed credits. When the amount of credits reaches 0, it waits (in a non-blocking fashion) until the broker grants more credits to the AMQP sender.
Configuring the AMQP address
可以使用 address
属性配置 AMQP 地址:
You can configure the AMQP address using the address
attribute:
mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=my-queue
mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.outgoing.orders.address=my-order-queue
如果未设置 address
属性,则连接器将使用通道名称。
If the address
attribute is not set, the connector uses the channel name.
为了使用现有队列,需要配置 address
、container-id
以及(可选)link-name
属性。例如,如果配置了 Apache Artemis 代理:
To use an existing queue, you need to configure the address
, container-id
and, optionally, the link-name
attributes.
For example, if you have an Apache Artemis broker configured with:
<queues>
<queue name="people">
<address>people</address>
<durable>true</durable>
<user>artemis</user>
</queue>
</queues>
您需要以下配置:
You need the following configuration:
mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people
如果队列名称不是通道名称,可能需要配置 link-name
属性:
You may need to configure the link-name
attribute, if the queue name is not the channel name:
mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people
为了使用 MULTICAST
队列,需要提供 FQQN(完全限定的队列名称),而不仅仅是队列名称:
To use a MULTICAST
queue, you need to provide the FQQN (fully-qualified queue name) instead of just the name of the queue:
mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo
mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people
有关 AMQP 地址模型的更多详细信息,请参见 Artemis documentation。
More details about the AMQP Address model can be found in the Artemis documentation.
Execution model and Blocking processing
响应式消息传递在 I/O 线程中调用你的方法。有关此主题的更多详细信息,请查看 Quarkus Reactive Architecture documentation。但是,你通常需要将响应式消息传递与数据库交互等阻塞式处理结合使用。为此,你需要使用 @Blocking
注解,指示处理为 blocking 且不应该在调用者线程上运行。
Reactive Messaging invokes your method on an I/O thread.
See the Quarkus Reactive Architecture documentation for further details on this topic.
But, you often need to combine Reactive Messaging with blocking processing such as database interactions.
For this, you need to use the @Blocking
annotation indicating that the processing is blocking and should not be run on the caller thread.
例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入有效负载存储到数据库:
For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:
import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class PriceStorage {
@Incoming("prices")
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
price.value = priceInUsd;
price.persist();
}
}
有 2 个 There are 2
它们具有相同的效果。因此,可以同时使用。第一个提供更精细的调整,例如要使用的工作器池以及是否保留顺序。第二个也与 Quarkus 的其他响应式特性结合使用,使用默认工作器池并保留顺序。 They have the same effect. Thus, you can use both. The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order. |
@RunOnVirtualThread
有关在 Java {@s7} 上运行阻塞处理,请参阅 {@s8}。 For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation. |
@Transactional
如果用 {@s9} 对您的方法进行了注解,则它将自动被视为`{@s11}`,即使没有用 {@s10} 对方法进行注解。 If your method is annotated with |
Customizing the underlying AMQP client
连接器在底层使用 Vert.x AMQP 客户端。有关此客户端的更多详细信息,请参见 Vert.x website。
The connector uses the Vert.x AMQP client underneath. More details about this client can be found in the Vert.x website.
可以通过生成一个 AmqpClientOptions
实例来定制底层客户端配置,如下所示:
You can customize the underlying client configuration by producing an instance of AmqpClientOptions
as follows:
@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
// You can use the produced options to configure the TLS connection
PemKeyCertOptions keycert = new PemKeyCertOptions()
.addCertPath("./tls/tls.crt")
.addKeyPath("./tls/tls.key");
PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
return new AmqpClientOptions()
.setSsl(true)
.setPemKeyCertOptions(keycert)
.setPemTrustOptions(trust)
.addEnabledSaslMechanism("EXTERNAL")
.setHostnameVerificationAlgorithm("") // Disables the hostname verification. Defaults is "HTTPS"
.setConnectTimeout(30000)
.setReconnectInterval(5000)
.setContainerId("my-container");
}
此实例会被检索并用于配置连接器所用的客户端。您需要使用 client-options-name
属性指示客户端的名称:
This instance is retrieved and used to configure the client used by the connector.
You need to indicate the name of the client using the client-options-name
attribute:
mp.messaging.incoming.prices.client-options-name=my-named-options
如果您经常与代理断开连接,也可以使用 AmqpClientOptions
在需要永久保持 AMQP 连接时设置心跳。某些代理可能在一定空闲超时后终止 AMQP 连接。您可以提供一个心跳值,Vert.x Proton 客户端将在向远程对等方打开传输内容时使用该值来通告空闲超时。
If you experience frequent disconnections from the broker, the AmqpClientOptions
can also be used to set a heartbeat if you need to keep the AMQP connection permanently.
Some brokers might terminate the AMQP connection after a certain idle timeout.
You can provide a heartbeat value which will be used by the Vert.x proton client to advertise the idle timeout when opening transport to a remote peer.
@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
// set a heartbeat of 30s (in milliseconds)
return new AmqpClientOptions()
.setHeartbeat(30000);
}
Health reporting
如果您对带有 quarkus-smallrye-health
扩展的 AMQP 连接器使用该扩展,那么它将有助于就绪性和健康检查。AMQP 连接器会报告连接器所管理的每个通道的就绪性和健康状况。目前,AMQP 连接器对就绪性和健康检查使用相同的逻辑。
If you use the AMQP connector with the quarkus-smallrye-health
extension, it contributes to the readiness and liveness probes.
The AMQP connector reports the readiness and liveness of each channel managed by the connector.
At the moment, the AMQP connector uses the same logic for the readiness and liveness checks.
要禁用运行状况报告,请将通道的 health-enabled
属性设置为 false。在入站端(从 AMQP 接收消息),检查将验证接收器是否已附加到代理。在出站端(向 AMQP 发送记录),检查将验证发送器是否已附加到代理。
To disable health reporting, set the health-enabled
attribute for the channel to false.
On the inbound side (receiving messages from AMQP), the check verifies that the receiver is attached to the broker.
On the outbound side (sending records to AMQP), the check verifies that the sender is attached to the broker.
请注意,消息处理失败会对消息发出否定确认,然后由 failure-strategy
处理该消息。failure-strategy
负责报告失败并影响检查结果。fail
失败策略会报告失败,因此检查会报告故障。
Note that a message processing failures nacks the message, which is then handled by the failure-strategy
.
It is the responsibility of the failure-strategy
to report the failure and influence the outcome of the checks.
The fail
failure strategy reports the failure, and so the check will report the fault.
Using RabbitMQ
此连接器适用于 AMQP 1.0。RabbitMQ 实现 AMQP 0.9.1。RabbitMQ 默认不提供 AMQP 1.0,但有一个可用于该版本的插件。要将 RabbitMQ 与此连接器配合使用,请启用并配置 AMQP 1.0 插件。
This connector is for AMQP 1.0. RabbitMQ implements AMQP 0.9.1. RabbitMQ does not provide AMQP 1.0 by default, but there is a plugin for it. To use RabbitMQ with this connector, enable and configure the AMQP 1.0 plugin.
尽管存在该插件,但一些 AMQP 1.0 功能无法与 RabbitMQ 一起使用。因此,我们推荐以下配置。
Despite the existence of the plugin, a few AMQP 1.0 features won’t work with RabbitMQ. Thus, we recommend the following configurations.
从 RabbitMQ 接收消息:
To receive messages from RabbitMQ:
-
Set durable to false
mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false
向 RabbitMQ 发送消息:
To send messages to RabbitMQ:
-
set the destination address (anonymous sender are not supported)
-
set
use-anonymous-sender
to false
mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false
因此,在使用 RabbitMQ 时无法动态更改目标(使用消息元数据)。
As a consequence, it’s not possible to change the destination dynamically (using message metadata) when using RabbitMQ.
Receiving Cloud Events
AMQP 连接器支持 Cloud Events。当连接器检测到 structured 或 binary Cloud 事件时,它会在 Message
的元数据中添加一个 IncomingCloudEventMetadata<T>
.IncomingCloudEventMetadata
包含对强制和可选 Cloud 事件属性的访问器。
The AMQP connector supports Cloud Events.
When the connector detects a structured or binary Cloud Events, it adds a IncomingCloudEventMetadata<T>
into the metadata of the Message
.
IncomingCloudEventMetadata
contains accessors to the mandatory and optional Cloud Event attributes.
如果连接器无法提取 Cloud 事件元数据,它将在没有元数据的情况下发送消息。
If the connector cannot extract the Cloud Event metadata, it sends the Message without the metadata.
有关接收 Cloud 事件的更多信息,请参见 SmallRye Reactive Messaging 文档中的 Receiving Cloud Events。
For more information on receiving Cloud Events, see Receiving Cloud Events in SmallRye Reactive Messaging documentation.
Sending Cloud Events
AMQP 连接器支持 Cloud Events。如果满足以下条件,连接器将以 Cloud 事件的形式发送出站记录:
The AMQP connector supports Cloud Events. The connector sends the outbound record as Cloud Events if:
-
the message metadata contains an
io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata
instance, -
the channel configuration defines the
cloud-events-type
andcloud-events-source
attributes.
有关发送 Cloud 事件的更多信息,请参见 SmallRye Reactive Messaging 文档中的 Sending Cloud Events。
For more information on sending Cloud Events, see Sending Cloud Events in SmallRye Reactive Messaging documentation.
AMQP Connector Configuration Reference
Quarkus specific configuration
Unresolved directive in amqp-reference.adoc - include::{generated-dir}/config/quarkus-messaging-amqp.adoc[]
Incoming channel configuration
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
[role="no-hyphens"]address |
The AMQP address. If not set, the channel name is used Type: string |
false |
|
[role="no-hyphens"]auto-acknowledgement |
Whether the received AMQP messages must be acknowledged when received Type: boolean |
false |
|
[role="no-hyphens"]broadcast |
Whether the received AMQP messages must be dispatched to multiple subscribers Type: boolean |
false |
|
[role="no-hyphens"]capabilities |
A comma-separated list of capabilities proposed by the sender or receiver client. Type: string |
false |
|
[role="no-hyphens"]client-options-name [role="no-hyphens"](amqp-client-options-name) |
The name of the AMQP Client Option bean used to customize the AMQP client configuration Type: string |
false |
|
[role="no-hyphens"]cloud-events |
Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata. Type: boolean |
false |
|
[role="no-hyphens"]connect-timeout [role="no-hyphens"](amqp-connect-timeout) |
The connection timeout in milliseconds Type: int |
false |
|
[role="no-hyphens"]container-id |
The AMQP container id Type: string |
false |
|
[role="no-hyphens"]durable |
Whether AMQP subscription is durable Type: boolean |
false |
|
[role="no-hyphens"]failure-strategy |
Specify the failure strategy to apply when a message produced from an AMQP message is nacked. Accepted values are Type: string |
false |
|
[role="no-hyphens"]health-timeout |
The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed. Type: int |
false |
|
[role="no-hyphens"]host [role="no-hyphens"](amqp-host) |
The broker hostname Type: string |
false |
|
[role="no-hyphens"]link-name |
The name of the link. If not set, the channel name is used. Type: string |
false |
|
[role="no-hyphens"]password [role="no-hyphens"](amqp-password) |
The password used to authenticate to the broker Type: string |
false |
|
[role="no-hyphens"]port [role="no-hyphens"](amqp-port) |
The broker port Type: int |
false |
|
[role="no-hyphens"]reconnect-attempts [role="no-hyphens"](amqp-reconnect-attempts) |
The number of reconnection attempts Type: int |
false |
|
[role="no-hyphens"]reconnect-interval [role="no-hyphens"](amqp-reconnect-interval) |
The interval in second between two reconnection attempts Type: int |
false |
|
[role="no-hyphens"]sni-server-name [role="no-hyphens"](amqp-sni-server-name) |
If set, explicitly override the hostname to use for the TLS SNI server name Type: string |
false |
|
[role="no-hyphens"]selector |
Sets a message selector. This attribute is used to define an Type: string |
false |
|
[role="no-hyphens"]tracing-enabled |
Whether tracing is enabled (default) or disabled Type: boolean |
false |
|
[role="no-hyphens"]use-ssl [role="no-hyphens"](amqp-use-ssl) |
Whether the AMQP connection uses SSL/TLS Type: boolean |
false |
|
[role="no-hyphens"]username [role="no-hyphens"](amqp-username) |
The username used to authenticate to the broker Type: string |
false |
|
[role="no-hyphens"]virtual-host [role="no-hyphens"](amqp-virtual-host) |
If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use) Type: string |
false |
Outgoing channel configuration
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
[role="no-hyphens"]address |
The AMQP address. If not set, the channel name is used Type: string |
false |
|
[role="no-hyphens"]capabilities |
A comma-separated list of capabilities proposed by the sender or receiver client. Type: string |
false |
|
[role="no-hyphens"]client-options-name [role="no-hyphens"](amqp-client-options-name) |
The name of the AMQP Client Option bean used to customize the AMQP client configuration Type: string |
false |
|
[role="no-hyphens"]cloud-events |
Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata. Type: boolean |
false |
|
[role="no-hyphens"]cloud-events-data-content-type [role="no-hyphens"](cloud-events-default-data-content-type) |
Configure the default Type: string |
false |
|
[role="no-hyphens"]cloud-events-data-schema [role="no-hyphens"](cloud-events-default-data-schema) |
Configure the default Type: string |
false |
|
[role="no-hyphens"]cloud-events-insert-timestamp [role="no-hyphens"](cloud-events-default-timestamp) |
Whether the connector should insert automatically the Type: boolean |
false |
|
[role="no-hyphens"]cloud-events-mode |
The Cloud Event mode ( Type: string |
false |
|
[role="no-hyphens"]cloud-events-source [role="no-hyphens"](cloud-events-default-source) |
Configure the default Type: string |
false |
|
[role="no-hyphens"]cloud-events-subject [role="no-hyphens"](cloud-events-default-subject) |
Configure the default Type: string |
false |
|
[role="no-hyphens"]cloud-events-type [role="no-hyphens"](cloud-events-default-type) |
Configure the default Type: string |
false |
|
[role="no-hyphens"]connect-timeout [role="no-hyphens"](amqp-connect-timeout) |
The connection timeout in milliseconds Type: int |
false |
|
[role="no-hyphens"]container-id |
The AMQP container id Type: string |
false |
|
[role="no-hyphens"]credit-retrieval-period |
The period (in milliseconds) between two attempts to retrieve the credits granted by the broker. This time is used when the sender run out of credits. Type: int |
false |
|
[role="no-hyphens"]durable |
Whether sent AMQP messages are marked durable Type: boolean |
false |
|
[role="no-hyphens"]health-timeout |
The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed. Type: int |
false |
|
[role="no-hyphens"]host [role="no-hyphens"](amqp-host) |
The broker hostname Type: string |
false |
|
[role="no-hyphens"]link-name |
The name of the link. If not set, the channel name is used. Type: string |
false |
|
[role="no-hyphens"]merge |
Whether the connector should allow multiple upstreams Type: boolean |
false |
|
[role="no-hyphens"]password [role="no-hyphens"](amqp-password) |
The password used to authenticate to the broker Type: string |
false |
|
[role="no-hyphens"]port [role="no-hyphens"](amqp-port) |
The broker port Type: int |
false |
|
[role="no-hyphens"]reconnect-attempts [role="no-hyphens"](amqp-reconnect-attempts) |
The number of reconnection attempts Type: int |
false |
|
[role="no-hyphens"]reconnect-interval [role="no-hyphens"](amqp-reconnect-interval) |
The interval in second between two reconnection attempts Type: int |
false |
|
[role="no-hyphens"]sni-server-name [role="no-hyphens"](amqp-sni-server-name) |
If set, explicitly override the hostname to use for the TLS SNI server name Type: string |
false |
|
[role="no-hyphens"]tracing-enabled |
Whether tracing is enabled (default) or disabled Type: boolean |
false |
|
[role="no-hyphens"]ttl |
The time-to-live of the sent AMQP messages. 0 to disable the TTL Type: long |
false |
|
[role="no-hyphens"]use-anonymous-sender |
Whether the connector should use an anonymous sender. Default value is Type: boolean |
false |
|
[role="no-hyphens"]use-ssl [role="no-hyphens"](amqp-use-ssl) |
Whether the AMQP connection uses SSL/TLS Type: boolean |
false |
|
[role="no-hyphens"]username [role="no-hyphens"](amqp-username) |
The username used to authenticate to the broker Type: string |
false |
|
[role="no-hyphens"]virtual-host [role="no-hyphens"](amqp-virtual-host) |
If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use) Type: string |
false |
Conditionally configure channels
您可以使用特定配置文件配置通道。因此,仅当启用了指定配置文件时才配置通道(并将它们添加到应用程序中)。
You can configure the channels using a specific profile. Thus, the channels are only configured (and added to the application) when the specified profile is enabled.
要实现此目的,您需要做以下操作:
To achieve this, you need:
-
Prefix the
mp.messaging.[incoming|outgoing].$channel
entries with%my-profile
such as%my-profile.mp.messaging.[incoming|outgoing].$channel.key=value
-
Use the
@IfBuildProfile("my-profile")
on the CDI beans containing@Incoming(channel)
and@Outgoing(channel)
annotations that need only to be enabled when the profile is enabled.
请注意,响应性消息传递会验证图形是否完整。因此,在使用这样的条件配置时,请确保应用程序可与已启用和未启用的配置文件配合使用。
Note that reactive messaging verifies that the graph is complete. So, when using such a conditional configuration, ensure the application works with and without the profile enabled.
请注意,此方法还可以根据配置文件来更改通道配置。
Note that this approach can also be used to change the channel configuration based on a profile.