Reactive Messaging RabbitMQ Connector Reference Documentation
本指南是 Getting Started with RabbitMQ 的配套指南。它更详细地解释了用于反应式消息传递的 RabbitMQ 连接器的配置和使用方法。
This guide is the companion from the Getting Started with RabbitMQ. It explains in more details the configuration and usage of the RabbitMQ connector for reactive messaging.
本文档并未涵盖连接器的所有详细信息。有关更多详细信息,请参阅 SmallRye Reactive Messaging website。 |
This documentation does not cover all the details of the connector. Refer to the SmallRye Reactive Messaging website for further details. |
RabbitMQ 连接器允许 Quarkus 应用程序使用 AMQP 0.9.1 协议发送和接收消息。可以在 the AMQP 0.9.1 specification 中找到有关该协议的更多详细信息。
The RabbitMQ connector allows Quarkus applications to send and receive messages using the AMQP 0.9.1 protocol. More details about the protocol can be found in the AMQP 0.9.1 specification.
RabbitMQ 连接器支持 AMQP 0-9-1,这与 AMQP 1.0 连接器所使用的 AMQP 1.0 协议非常不同。你可以将 AMQP 1.0 连接器与 RabbitMQ 搭配使用,如 AMQP 1.0 connector reference 中所述,尽管有 reduced functionality。
The RabbitMQ connector supports AMQP 0-9-1, which is very different from the AMQP 1.0 protocol used by the AMQP 1.0 connector. You can use the AMQP 1.0 connector with RabbitMQ as described in the AMQP 1.0 connector reference, albeit with reduced functionality.
Unresolved directive in rabbitmq-reference.adoc - include::{includes}/extension-status.adoc[]
RabbitMQ connector extension
要使用该连接器,你需要添加 quarkus-messaging-rabbitmq
扩展。
To use the connector, you need to add the quarkus-messaging-rabbitmq
extension.
你可以使用以下方式将扩展添加到你的项目:
You can add the extension to your project using:
> ./mvnw quarkus:add-extensions -Dextensions="quarkus-messaging-rabbitmq"
或者只需将以下依赖项添加到你的项目中:
Or just add the following dependency to your project:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-rabbitmq</artifactId>
</dependency>
添加到你的项目后,你可以通过配置 connector
属性将 channels 映射到 RabbitMQ 交换或者队列:
Once added to your project, you can map channels to RabbitMQ exchanges or queues by configuring the connector
attribute:
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-rabbitmq
# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-rabbitmq
outgoing
通道映射到 RabbitMQ 交换,而 incoming
通道映射到 RabbitMQ 队列,因为代理需要。
outgoing
channels are mapped to RabbitMQ exchanges and incoming
channels are mapped to RabbitMQ queues as required
by the broker.
Configuring the RabbitMQ Broker access
RabbitMQ 连接器连接到 RabbitMQ 代理。要配置代理的位置和凭证,请在 application.properties
中添加以下属性:
The RabbitMQ connector connects to RabbitMQ brokers.
To configure the location and credentials of the broker, add the following properties in the application.properties
:
rabbitmq-host=amqp (1)
rabbitmq-port=5672 (2)
rabbitmq-username=my-username (3)
rabbitmq-password=my-password (4)
mp.messaging.incoming.prices.connector=smallrye-rabbitmq (5)
1 | Configures the broker host name. You can do it per channel (using the host attribute) or globally using rabbitmq-host |
2 | Configures the broker port. You can do it per channel (using the port attribute) or globally using rabbitmq-port . The default is 5672 . |
3 | Configures the broker username if required. You can do it per channel (using the username attribute) or globally using rabbitmq-username . |
4 | Configures the broker password if required. You can do it per channel (using the password attribute) or globally using rabbitmq-password . |
5 | Instructs the prices channel to be managed by the RabbitMQ connector |
在开发模式下运行测试时,Dev Services for RabbitMQ 会自动启动一个 RabbitMQ 代理。
In dev mode and when running tests, Dev Services for RabbitMQ automatically starts a RabbitMQ broker.
Receiving RabbitMQ messages
假设你的应用程序接收了 Message<Double>
。你可以直接使用有效负载:
Let’s imagine your application receives Message<Double>
.
You can consume the payload directly:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class RabbitMQPriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
或者,你可以检索 Message<Double>:
Or, you can retrieve the Message<Double>:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class RabbitMQPriceMessageConsumer {
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> price) {
// process your price.
// Acknowledge the incoming message, marking the RabbitMQ message as `accepted`.
return price.ack();
}
}
Inbound Metadata
来自 RabbitMQ 的消息在元数据中包含一个 IncomingRabbitMQMetadata
实例。
Messages coming from RabbitMQ contain an instance of IncomingRabbitMQMetadata
in the metadata.
Optional<IncomingRabbitMQMetadata> metadata = incoming.getMetadata(IncomingRabbitMQMetadata.class);
metadata.ifPresent(meta -> {
final Optional<String> contentEncoding = meta.getContentEncoding();
final Optional<String> contentType = meta.getContentType();
final Optional<String> correlationId = meta.getCorrelationId();
final Optional<ZonedDateTime> creationTime = meta.getCreationTime(ZoneId.systemDefault());
final Optional<Integer> priority = meta.getPriority();
final Optional<String> replyTo = meta.getReplyTo();
final Optional<String> userId = meta.getUserId();
// Access a single String-valued header
final Optional<String> stringHeader = meta.getHeader("my-header", String.class);
// Access all headers
final Map<String,Object> headers = meta.getHeaders();
// ...
});
Deserialization
连接器将传入的 RabbitMQ 消息转换为 Reactive Messaging Message<T>
实例。有效负载类型 T
取决于 RabbitMQ 接收的消息包络 content_type
和 content_encoding
属性的值。
The connector converts incoming RabbitMQ Messages into Reactive Messaging Message<T>
instances. The payload type T
depends on the value of the RabbitMQ received message Envelope content_type
and content_encoding
properties.
content_encoding | content_type | T |
---|---|---|
Value present |
n/a |
|
No value |
|
|
No value |
|
a JSON element which can be a |
No value |
Anything else |
|
如果你使用此 RabbitMQ 连接器(出站连接器)发送对象,则它们将被编码为 JSON 并发送,其中 content_type
设置为 application/json
。你可以使用(Vert.x)JSON 对象来接收此有效负载,然后将其映射到你想要的类对象:
If you send objects with this RabbitMQ connector (outbound connector), they are encoded as JSON and sent with content_type
set to application/json
. You can receive this payload using (Vert.x) JSON Objects, and then map it to the object class you want:
@ApplicationScoped
public static class Generator {
@Outgoing("to-rabbitmq")
public Multi<Price> prices() { (1)
AtomicInteger count = new AtomicInteger();
return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
.map(l -> new Price().setPrice(count.incrementAndGet()))
.onOverflow().drop();
}
}
@ApplicationScoped
public static class Consumer {
List<Price> prices = new CopyOnWriteArrayList<>();
@Incoming("from-rabbitmq")
public void consume(JsonObject p) { (2)
Price price = p.mapTo(Price.class); (3)
prices.add(price);
}
public List<Price> list() {
return prices;
}
}
1 | The Price instances are automatically encoded to JSON by the connector |
2 | You can receive it using a JsonObject |
3 | Then, you can reconstruct the instance using the mapTo method |
|
The |
Acknowledgement
与 RabbitMQ 消息关联的反应式消息消息被确认时,它会通知代理消息已 accepted。
When a Reactive Messaging Message associated with a RabbitMQ Message is acknowledged, it informs the broker that the message has been accepted.
是否需要明确确认消息取决于通道的 auto-acknowledgement
设置;如果设置为 true,则你的消息将在收到时自动确认。
Whether you need to explicitly acknowledge the message depends on the auto-acknowledgement
setting for the channel; if that is set to true then your message will be automatically acknowledged on receipt.
Failure Management
如果从 RabbitMQ 消息生成的被 nack 的消息,则应用失败策略。RabbitMQ 连接器支持三种策略,由失败策略通道设置控制:
If a message produced from a RabbitMQ message is nacked, a failure strategy is applied. The RabbitMQ connector supports three strategies, controlled by the failure-strategy channel setting:
-
fail
- fail the application; no more RabbitMQ messages will be processed. The RabbitMQ message is marked as rejected. -
accept
- this strategy marks the RabbitMQ message as accepted. The processing continues ignoring the failure. -
reject
- this strategy marks the RabbitMQ message as rejected (default). The processing continues with the next message.
Sending RabbitMQ messages
Serialization
在发送 Message<T>
时,连接器将此消息转换为 RabbitMQ 消息。有效负载转换为 RabbitMQ 消息正文。
When sending a Message<T>
, the connector converts the message into a RabbitMQ Message. The payload is converted to the RabbitMQ Message body.
T | RabbitMQ Message Body |
---|---|
primitive types or |
String value with |
Serialized String payload with |
|
|
Binary content, with |
|
Binary content, with |
Any other class |
The payload is converted to JSON (using a Json Mapper) then serialized with |
如果无法将消息有效内容序列化为 JSON,则中断消息 nacked。
If the message payload cannot be serialized to JSON, the message is nacked.
Outbound Metadata
在发送 Messages
时,你可以添加 OutgoingRabbitMQMetadata
的一个实例来影响 RabbitMQ 处理消息的方式。例如,你可以配置路由键、时间戳和标头:
When sending Messages
, you can add an instance of OutgoingRabbitMQMetadata
to influence how the message is handled by RabbitMQ. For example, you can configure the routing key, timestamp and
headers:
final OutgoingRabbitMQMetadata metadata = new OutgoingRabbitMQMetadata.Builder()
.withHeader("my-header", "xyzzy")
.withRoutingKey("urgent")
.withTimestamp(ZonedDateTime.now())
.build();
// Add `metadata` to the metadata of the outgoing message.
return Message.of("Hello", Metadata.of(metadata));
Configuring the RabbitMQ Exchange/Queue
你可以使用通道配置上的属性来配置与通道关联的 RabbitMQ 交换器或队列。incoming
通道映射到 RabbitMQ queues
而 outgoing
通道映射到 RabbitMQ
交换器。例如:
You can configure the RabbitMQ exchange or queue associated with a channel using properties on the channel configuration.
incoming
channels are mapped to RabbitMQ queues
and outgoing
channels are mapped to RabbitMQ
exchanges.
For example:
mp.messaging.incoming.prices.connector=smallrye-rabbitmq
mp.messaging.incoming.prices.queue.name=my-queue
mp.messaging.outgoing.orders.connector=smallrye-rabbitmq
mp.messaging.outgoing.orders.exchange.name=my-order-queue
如果未设置 exchange.name
或 queue.name
属性,连接器将使用通道名称。
If the exchange.name
or queue.name
attribute is not set, the connector uses the channel name.
要使用现有队列,你需要配置 name
并将交换器或队列的 declare
属性设置为 false
。例如,如果你有一个使用 people
交换器和队列配置的 RabbitMQ 代理,则需要以下配置:
To use an existing queue, you need to configure the name
and set the exchange’s or queue’s declare
property to false
.
For example, if you have a RabbitMQ broker configured with a people
exchange and queue, you need the following configuration:
mp.messaging.incoming.people.connector=smallrye-rabbitmq
mp.messaging.incoming.people.queue.name=people
mp.messaging.incoming.people.queue.declare=false
mp.messaging.outgoing.people.connector=smallrye-rabbitmq
mp.messaging.outgoing.people.exchange.name=people
mp.messaging.outgoing.people.exchange.declare=false
Execution model and Blocking processing
响应式消息传递在 I/O 线程中调用你的方法。有关此主题的更多详细信息,请查看 Quarkus Reactive Architecture documentation。但是,你通常需要将响应式消息传递与数据库交互等阻塞式处理结合使用。为此,你需要使用 @Blocking
注解,指示处理为 blocking 且不应该在调用者线程上运行。
Reactive Messaging invokes your method on an I/O thread.
See the Quarkus Reactive Architecture documentation for further details on this topic.
But, you often need to combine Reactive Messaging with blocking processing such as database interactions.
For this, you need to use the @Blocking
annotation indicating that the processing is blocking and should not be run on the caller thread.
例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入有效负载存储到数据库:
For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:
import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class PriceStorage {
@Incoming("prices")
@Blocking
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
price.value = priceInUsd;
price.persist();
}
}
有 2 个 There are 2
它们具有相同的效果。因此,可以同时使用。第一个提供更精细的调整,例如要使用的工作器池以及是否保留顺序。第二个也与 Quarkus 的其他响应式特性结合使用,使用默认工作器池并保留顺序。 They have the same effect. Thus, you can use both. The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order. |
@RunOnVirtualThread
有关在 Java {@s7} 上运行阻塞处理,请参阅 {@s8}。 For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation. |
Customizing the underlying RabbitMQ client
连接器在下面使用 Vert.x RabbitMQ 客户端。关于此客户端的更多详细信息,请参见 Vert.x website。
The connector uses the Vert.x RabbitMQ client underneath. More details about this client can be found in the Vert.x website.
可以通过生成 RabbitMQOptions
的实例来自定义基础客户端配置,如下所示:
You can customize the underlying client configuration by producing an instance of RabbitMQOptions
as follows:
@Produces
@Identifier("my-named-options")
public RabbitMQOptions getNamedOptions() {
PemKeyCertOptions keycert = new PemKeyCertOptions()
.addCertPath("./tls/tls.crt")
.addKeyPath("./tls/tls.key");
PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
// You can use the produced options to configure the TLS connection
return new RabbitMQOptions()
.setSsl(true)
.setPemKeyCertOptions(keycert)
.setPemTrustOptions(trust)
.setUser("user1")
.setPassword("password1")
.setHost("localhost")
.setPort(5672)
.setVirtualHost("vhost1")
.setConnectionTimeout(6000) // in milliseconds
.setRequestedHeartbeat(60) // in seconds
.setHandshakeTimeout(6000) // in milliseconds
.setRequestedChannelMax(5)
.setNetworkRecoveryInterval(500) // in milliseconds
.setAutomaticRecoveryEnabled(true);
}
此实例会被检索并用于配置连接器所用的客户端。您需要使用 client-options-name
属性指示客户端的名称:
This instance is retrieved and used to configure the client used by the connector.
You need to indicate the name of the client using the client-options-name
attribute:
mp.messaging.incoming.prices.client-options-name=my-named-options
Health reporting
如果你使用带有 quarkus-smallrye-health
扩展的 RabbitMQ 连接器,则它有助于准备和运行状况探测。RabbitMQ 连接器报告连接器管理的每个通道的准备和运行状况。
If you use the RabbitMQ connector with the quarkus-smallrye-health
extension, it contributes to the readiness and liveness probes.
The RabbitMQ connector reports the readiness and liveness of each channel managed by the connector.
要禁用运行状况报告,请将通道的 health-enabled
属性设置为 false。
To disable health reporting, set the health-enabled
attribute for the channel to false.
在入站(从 RabbitMQ 接收消息)侧,检查会验证接收器是否已连接到代理。
On the inbound side (receiving messages from RabbitMQ), the check verifies that the receiver is connected to the broker.
在出站(将记录发送到 RabbitMQ)侧,检查会验证发送器未断开与代理的连接;发送器 may 仍处于初始化状态(尚未尝试连接),但将其视为 live/ready。
On the outbound side (sending records to RabbitMQ), the check verifies that the sender is not disconnected from the broker; the sender may still be in an initialised state (connection not yet attempted), but this is regarded as live/ready.
请注意,消息处理失败会标记该消息,然后由 failure-strategy
处理。由 failure-strategy
报告失败并影响检查结果。fail
失败策略会报告失败,因此检查将报告故障。
Note that a message processing failures nacks the message, which is then handled by the failure-strategy
.
It’s the responsibility of the failure-strategy
to report the failure and influence the outcome of the checks.
The fail
failure strategy reports the failure, and so the check will report the fault.
Dynamic Credentials
Quarkus 和 RabbitMQ 连接器支持 Vault’s RabbitMQ secrets engine 以生成短期动态凭证。这允许 Vault 定期创建和取消 RabbitMQ 凭证。
Quarkus and the RabbitMQ connector support Vault’s RabbitMQ secrets engine for generating short-lived dynamic credentials. This allows Vault to create and retire RabbitMQ credentials on a regular basis.
首先,我们需要启用 Vault 的 rabbitmq
秘密引擎,使用 RabbitMQ 的连接和认证信息对其进行配置,并创建一个 Vault 角色 my-role
(用正在运行 RabbitMQ 容器的实际主机替换 10.0.0.3
):
First we need to enable Vault’s rabbitmq
secret engine, configure it with RabbitMQ’s connection and authentication
information, and create a Vault role my-role
(replace 10.0.0.3
by the actual host that is running the
RabbitMQ container):
vault secrets enable rabbitmq
vault write rabbitmq/config/connection \
connection_uri=http://10.0.0.3:15672 \
username=guest \
password=guest
vault write rabbitmq/roles/my-role \
vhosts='{"/":{"write": ".*", "read": ".*"}}'
对于此用例,上面配置的用户 For this use case, user |
然后,我们需要在路径 rabbitmq/creds/my-role
上为 Quarkus 应用程序提供读取功能。
Then we need to give a read capability to the Quarkus application on path rabbitmq/creds/my-role
.
cat <<EOF | vault policy write vault-rabbitmq-policy -
path "secret/data/myapps/vault-rabbitmq-test/*" {
capabilities = ["read"]
}
path "rabbitmq/creds/my-role" {
capabilities = [ "read" ]
}
EOF
现在 Vault 知道如何在 RabbitMQ 中创建用户,我们需要配置 Quarkus 以使用 RabbitMQ 的凭证提供程序。
Now that Vault knows how to create users in RabbitMQ, we need to configure Quarkus to use a credentials-provider for RabbitMQ.
首先,我们告诉 Quarkus 使用名为 rabbitmq
的凭据提供程序请求动态凭据。
First we tell Quarkus to request dynamic credentials using a credentials-provider named rabbitmq
.
quarkus.rabbitmq.credentials-provider = rabbitmq
接下来,我们配置 rabbitmq
凭据提供程序。credentials-role
选项必须设置为我们在 Vault 中创建的角色的名称,在本例中为 my-role
。credentials-mount
选项必须设置为 rabbitmq
。
Next we configure the rabbitmq
credentials provider. The credentials-role
option must be set to the name of the
role we created in Vault, in our case my-role
. The credentials-mount
option must be set to rabbitmq
.
quarkus.vault.credentials-provider.rabbitmq.credentials-role=my-role
quarkus.vault.credentials-provider.rabbitmq.credentials-mount=rabbitmq
|
The |
RabbitMQ Connector Configuration Reference
Incoming channel configuration
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
[role="no-hyphens"]username [role="no-hyphens"](rabbitmq-username) |
The username used to authenticate to the broker Type: string |
false |
|
[role="no-hyphens"]password [role="no-hyphens"](rabbitmq-password) |
The password used to authenticate to the broker Type: string |
false |
|
[role="no-hyphens"]host [role="no-hyphens"](rabbitmq-host) |
The broker hostname Type: string |
false |
|
[role="no-hyphens"]port [role="no-hyphens"](rabbitmq-port) |
The broker port Type: int |
false |
|
[role="no-hyphens"]ssl [role="no-hyphens"](rabbitmq-ssl) |
Whether the connection should use SSL Type: boolean |
false |
|
[role="no-hyphens"]trust-all [role="no-hyphens"](rabbitmq-trust-all) |
Whether to skip trust certificate verification Type: boolean |
false |
|
[role="no-hyphens"]trust-store-path [role="no-hyphens"](rabbitmq-trust-store-path) |
The path to a JKS trust store Type: string |
false |
|
[role="no-hyphens"]trust-store-password [role="no-hyphens"](rabbitmq-trust-store-password) |
The password of the JKS trust store Type: string |
false |
|
[role="no-hyphens"]credentials-provider-name [role="no-hyphens"](rabbitmq-credentials-provider-name) |
The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client Type: string |
false |
|
[role="no-hyphens"]connection-timeout |
The TCP connection timeout (ms); 0 is interpreted as no timeout Type: int |
false |
|
[role="no-hyphens"]handshake-timeout |
The AMQP 0-9-1 protocol handshake timeout (ms) Type: int |
false |
|
[role="no-hyphens"]automatic-recovery-enabled |
Whether automatic connection recovery is enabled Type: boolean |
false |
|
[role="no-hyphens"]automatic-recovery-on-initial-connection |
Whether automatic recovery on initial connections is enabled Type: boolean |
false |
|
[role="no-hyphens"]reconnect-attempts [role="no-hyphens"](rabbitmq-reconnect-attempts) |
The number of reconnection attempts Type: int |
false |
|
[role="no-hyphens"]reconnect-interval [role="no-hyphens"](rabbitmq-reconnect-interval) |
The interval (in seconds) between two reconnection attempts Type: int |
false |
|
[role="no-hyphens"]network-recovery-interval |
How long (ms) will automatic recovery wait before attempting to reconnect Type: int |
false |
|
[role="no-hyphens"]user |
The AMQP username to use when connecting to the broker Type: string |
false |
|
[role="no-hyphens"]include-properties |
Whether to include properties when a broker message is passed on the event bus Type: boolean |
false |
|
[role="no-hyphens"]requested-channel-max |
The initially requested maximum channel number Type: int |
false |
|
[role="no-hyphens"]requested-heartbeat |
The initially requested heartbeat interval (seconds), zero for none Type: int |
false |
|
[role="no-hyphens"]use-nio |
Whether usage of NIO Sockets is enabled Type: boolean |
false |
|
[role="no-hyphens"]virtual-host [role="no-hyphens"](rabbitmq-virtual-host) |
The virtual host to use when connecting to the broker Type: string |
false |
|
[role="no-hyphens"]exchange.name |
The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to Type: string |
false |
|
[role="no-hyphens"]exchange.durable |
Whether the exchange is durable Type: boolean |
false |
|
[role="no-hyphens"]exchange.auto-delete |
Whether the exchange should be deleted after use Type: boolean |
false |
|
[role="no-hyphens"]exchange.type |
The exchange type: direct, fanout, headers or topic (default) Type: string |
false |
|
[role="no-hyphens"]exchange.declare |
Whether to declare the exchange; set to false if the exchange is expected to be set up independently Type: boolean |
false |
|
[role="no-hyphens"]tracing.enabled |
Whether tracing is enabled (default) or disabled Type: boolean |
false |
|
[role="no-hyphens"]tracing.attribute-headers |
A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true Type: string |
false |
`` |
[role="no-hyphens"]queue.name |
The queue from which messages are consumed. Type: string |
true |
|
[role="no-hyphens"]queue.durable |
Whether the queue is durable Type: boolean |
false |
|
[role="no-hyphens"]queue.exclusive |
Whether the queue is for exclusive use Type: boolean |
false |
|
[role="no-hyphens"]queue.auto-delete |
Whether the queue should be deleted after use Type: boolean |
false |
|
[role="no-hyphens"]queue.declare |
Whether to declare the queue and binding; set to false if these are expected to be set up independently Type: boolean |
false |
|
[role="no-hyphens"]queue.ttl |
If specified, the time (ms) for which a message can remain in the queue undelivered before it is dead Type: long |
false |
|
[role="no-hyphens"]queue.single-active-consumer |
If set to true, only one consumer can actively consume messages Type: boolean |
false |
|
[role="no-hyphens"]queue.x-queue-type |
If automatically declare queue, we can choose different types of queue [quorum, classic, stream] Type: string |
false |
|
[role="no-hyphens"]queue.x-queue-mode |
If automatically declare queue, we can choose different modes of queue [lazy, default] Type: string |
false |
|
[role="no-hyphens"]max-incoming-internal-queue-size |
The maximum size of the incoming internal queue Type: int |
false |
|
[role="no-hyphens"]connection-count |
The number of RabbitMQ connections to create for consuming from this queue. This might be necessary to consume from a sharded queue with a single client. Type: int |
false |
|
[role="no-hyphens"]auto-bind-dlq |
Whether to automatically declare the DLQ and bind it to the binder DLX Type: boolean |
false |
|
[role="no-hyphens"]dead-letter-queue-name |
The name of the DLQ; if not supplied will default to the queue name with '.dlq' appended Type: string |
false |
|
[role="no-hyphens"]dead-letter-exchange |
A DLX to assign to the queue. Relevant only if auto-bind-dlq is true Type: string |
false |
|
[role="no-hyphens"]dead-letter-exchange-type |
The type of the DLX to assign to the queue. Relevant only if auto-bind-dlq is true Type: string |
false |
|
[role="no-hyphens"]dead-letter-routing-key |
A dead letter routing key to assign to the queue; if not supplied will default to the queue name Type: string |
false |
|
[role="no-hyphens"]dlx.declare |
Whether to declare the dead letter exchange binding. Relevant only if auto-bind-dlq is true; set to false if these are expected to be set up independently Type: boolean |
false |
|
[role="no-hyphens"]dead-letter-queue-type |
If automatically declare DLQ, we can choose different types of DLQ [quorum, classic, stream] Type: string |
false |
|
[role="no-hyphens"]dead-letter-queue-mode |
If automatically declare DLQ, we can choose different modes of DLQ [lazy, default] Type: string |
false |
|
[role="no-hyphens"]failure-strategy |
The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are Type: string |
false |
|
[role="no-hyphens"]broadcast |
Whether the received RabbitMQ messages must be dispatched to multiple subscribers Type: boolean |
false |
|
[role="no-hyphens"]auto-acknowledgement |
Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement Type: boolean |
false |
|
[role="no-hyphens"]keep-most-recent |
Whether to discard old messages instead of recent ones Type: boolean |
false |
|
[role="no-hyphens"]routing-keys |
A comma-separated list of routing keys to bind the queue to the exchange Type: string |
false |
|
[role="no-hyphens"]content-type-override |
Override the content_type attribute of the incoming message, should be a valid MIME type Type: string |
false |
|
[role="no-hyphens"]max-outstanding-messages |
The maximum number of outstanding/unacknowledged messages being processed by the connector at a time; must be a positive number Type: int |
false |
Outgoing channel configuration
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
[role="no-hyphens"]automatic-recovery-enabled |
Whether automatic connection recovery is enabled Type: boolean |
false |
|
[role="no-hyphens"]automatic-recovery-on-initial-connection |
Whether automatic recovery on initial connections is enabled Type: boolean |
false |
|
[role="no-hyphens"]connection-timeout |
The TCP connection timeout (ms); 0 is interpreted as no timeout Type: int |
false |
|
[role="no-hyphens"]default-routing-key |
The default routing key to use when sending messages to the exchange Type: string |
false |
`` |
[role="no-hyphens"]default-ttl |
If specified, the time (ms) sent messages can remain in queues undelivered before they are dead Type: long |
false |
|
[role="no-hyphens"]exchange.auto-delete |
Whether the exchange should be deleted after use Type: boolean |
false |
|
[role="no-hyphens"]exchange.declare |
Whether to declare the exchange; set to false if the exchange is expected to be set up independently Type: boolean |
false |
|
[role="no-hyphens"]exchange.durable |
Whether the exchange is durable Type: boolean |
false |
|
[role="no-hyphens"]exchange.name |
The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to Type: string |
false |
|
[role="no-hyphens"]exchange.type |
The exchange type: direct, fanout, headers or topic (default) Type: string |
false |
|
[role="no-hyphens"]handshake-timeout |
The AMQP 0-9-1 protocol handshake timeout (ms) Type: int |
false |
|
[role="no-hyphens"]host [role="no-hyphens"](rabbitmq-host) |
The broker hostname Type: string |
false |
|
[role="no-hyphens"]include-properties |
Whether to include properties when a broker message is passed on the event bus Type: boolean |
false |
|
[role="no-hyphens"]max-inflight-messages |
The maximum number of messages to be written to RabbitMQ concurrently; must be a positive number Type: long |
false |
|
[role="no-hyphens"]max-outgoing-internal-queue-size |
The maximum size of the outgoing internal queue Type: int |
false |
|
[role="no-hyphens"]network-recovery-interval |
How long (ms) will automatic recovery wait before attempting to reconnect Type: int |
false |
|
[role="no-hyphens"]password [role="no-hyphens"](rabbitmq-password) |
The password used to authenticate to the broker Type: string |
false |
|
[role="no-hyphens"]port [role="no-hyphens"](rabbitmq-port) |
The broker port Type: int |
false |
|
[role="no-hyphens"]reconnect-attempts [role="no-hyphens"](rabbitmq-reconnect-attempts) |
The number of reconnection attempts Type: int |
false |
|
[role="no-hyphens"]reconnect-interval [role="no-hyphens"](rabbitmq-reconnect-interval) |
The interval (in seconds) between two reconnection attempts Type: int |
false |
|
[role="no-hyphens"]requested-channel-max |
The initially requested maximum channel number Type: int |
false |
|
[role="no-hyphens"]requested-heartbeat |
The initially requested heartbeat interval (seconds), zero for none Type: int |
false |
|
[role="no-hyphens"]ssl [role="no-hyphens"](rabbitmq-ssl) |
Whether the connection should use SSL Type: boolean |
false |
|
[role="no-hyphens"]tracing.attribute-headers |
A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true Type: string |
false |
`` |
[role="no-hyphens"]tracing.enabled |
Whether tracing is enabled (default) or disabled Type: boolean |
false |
|
[role="no-hyphens"]trust-all [role="no-hyphens"](rabbitmq-trust-all) |
Whether to skip trust certificate verification Type: boolean |
false |
|
[role="no-hyphens"]trust-store-password [role="no-hyphens"](rabbitmq-trust-store-password) |
The password of the JKS trust store Type: string |
false |
|
[role="no-hyphens"]trust-store-path [role="no-hyphens"](rabbitmq-trust-store-path) |
The path to a JKS trust store Type: string |
false |
|
[role="no-hyphens"]credentials-provider-name [role="no-hyphens"](rabbitmq-credentials-provider-name) |
The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client Type: string |
false |
|
[role="no-hyphens"]use-nio |
Whether usage of NIO Sockets is enabled Type: boolean |
false |
|
[role="no-hyphens"]user |
The AMQP username to use when connecting to the broker Type: string |
false |
|
[role="no-hyphens"]username [role="no-hyphens"](rabbitmq-username) |
The username used to authenticate to the broker Type: string |
false |
|
[role="no-hyphens"]virtual-host [role="no-hyphens"](rabbitmq-virtual-host) |
The virtual host to use when connecting to the broker Type: string |
false |
|