Reactive Messaging RabbitMQ Connector Reference Documentation

本指南是 Getting Started with RabbitMQ 的配套指南。它更详细地解释了用于反应式消息传递的 RabbitMQ 连接器的配置和使用方法。

This guide is the companion from the Getting Started with RabbitMQ. It explains in more details the configuration and usage of the RabbitMQ connector for reactive messaging.

本文档并未涵盖连接器的所有详细信息。有关更多详细信息,请参阅 SmallRye Reactive Messaging website

This documentation does not cover all the details of the connector. Refer to the SmallRye Reactive Messaging website for further details.

RabbitMQ 连接器允许 Quarkus 应用程序使用 AMQP 0.9.1 协议发送和接收消息。可以在 the AMQP 0.9.1 specification 中找到有关该协议的更多详细信息。

The RabbitMQ connector allows Quarkus applications to send and receive messages using the AMQP 0.9.1 protocol. More details about the protocol can be found in the AMQP 0.9.1 specification.

RabbitMQ 连接器支持 AMQP 0-9-1,这与 AMQP 1.0 连接器所使用的 AMQP 1.0 协议非常不同。你可以将 AMQP 1.0 连接器与 RabbitMQ 搭配使用,如 AMQP 1.0 connector reference 中所述,尽管有 reduced functionality

The RabbitMQ connector supports AMQP 0-9-1, which is very different from the AMQP 1.0 protocol used by the AMQP 1.0 connector. You can use the AMQP 1.0 connector with RabbitMQ as described in the AMQP 1.0 connector reference, albeit with reduced functionality.

Unresolved directive in rabbitmq-reference.adoc - include::{includes}/extension-status.adoc[]

RabbitMQ connector extension

要使用该连接器,你需要添加 quarkus-messaging-rabbitmq 扩展。

To use the connector, you need to add the quarkus-messaging-rabbitmq extension.

你可以使用以下方式将扩展添加到你的项目:

You can add the extension to your project using:

> ./mvnw quarkus:add-extensions -Dextensions="quarkus-messaging-rabbitmq"

或者只需将以下依赖项添加到你的项目中:

Or just add the following dependency to your project:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-rabbitmq</artifactId>
</dependency>

添加到你的项目后,你可以通过配置 connector 属性将 channels 映射到 RabbitMQ 交换或者队列:

Once added to your project, you can map channels to RabbitMQ exchanges or queues by configuring the connector attribute:

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-rabbitmq

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-rabbitmq

outgoing 通道映射到 RabbitMQ 交换,而 incoming 通道映射到 RabbitMQ 队列,因为代理需要。

outgoing channels are mapped to RabbitMQ exchanges and incoming channels are mapped to RabbitMQ queues as required by the broker.

Configuring the RabbitMQ Broker access

RabbitMQ 连接器连接到 RabbitMQ 代理。要配置代理的位置和凭证,请在 application.properties 中添加以下属性:

The RabbitMQ connector connects to RabbitMQ brokers. To configure the location and credentials of the broker, add the following properties in the application.properties:

rabbitmq-host=amqp (1)
rabbitmq-port=5672 (2)
rabbitmq-username=my-username (3)
rabbitmq-password=my-password (4)

mp.messaging.incoming.prices.connector=smallrye-rabbitmq (5)
1 Configures the broker host name. You can do it per channel (using the host attribute) or globally using rabbitmq-host
2 Configures the broker port. You can do it per channel (using the port attribute) or globally using rabbitmq-port. The default is 5672.
3 Configures the broker username if required. You can do it per channel (using the username attribute) or globally using rabbitmq-username.
4 Configures the broker password if required. You can do it per channel (using the password attribute) or globally using rabbitmq-password.
5 Instructs the prices channel to be managed by the RabbitMQ connector

在开发模式下运行测试时,Dev Services for RabbitMQ 会自动启动一个 RabbitMQ 代理。

In dev mode and when running tests, Dev Services for RabbitMQ automatically starts a RabbitMQ broker.

Receiving RabbitMQ messages

假设你的应用程序接收了 Message<Double> 。你可以直接使用有效负载:

Let’s imagine your application receives Message<Double>. You can consume the payload directly:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class RabbitMQPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

或者,你可以检索 Message<Double>:

Or, you can retrieve the Message<Double>:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class RabbitMQPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message, marking the RabbitMQ message as `accepted`.
        return price.ack();
    }

}

Inbound Metadata

来自 RabbitMQ 的消息在元数据中包含一个 IncomingRabbitMQMetadata 实例。

Messages coming from RabbitMQ contain an instance of IncomingRabbitMQMetadata in the metadata.

Optional<IncomingRabbitMQMetadata> metadata = incoming.getMetadata(IncomingRabbitMQMetadata.class);
metadata.ifPresent(meta -> {
    final Optional<String> contentEncoding = meta.getContentEncoding();
    final Optional<String> contentType = meta.getContentType();
    final Optional<String> correlationId = meta.getCorrelationId();
    final Optional<ZonedDateTime> creationTime = meta.getCreationTime(ZoneId.systemDefault());
    final Optional<Integer> priority = meta.getPriority();
    final Optional<String> replyTo = meta.getReplyTo();
    final Optional<String> userId = meta.getUserId();

    // Access a single String-valued header
    final Optional<String> stringHeader = meta.getHeader("my-header", String.class);

    // Access all headers
    final Map<String,Object> headers = meta.getHeaders();
    // ...
});

Deserialization

连接器将传入的 RabbitMQ 消息转换为 Reactive Messaging Message<T> 实例。有效负载类型 T 取决于 RabbitMQ 接收的消息包络 content_typecontent_encoding 属性的值。

The connector converts incoming RabbitMQ Messages into Reactive Messaging Message<T> instances. The payload type T depends on the value of the RabbitMQ received message Envelope content_type and content_encoding properties.

content_encoding content_type T

Value present

n/a

byte[]

No value

text/plain

String

No value

application/json

a JSON element which can be a JsonArray, JsonObject, String, …​etc if the buffer contains an array, object, string, …​etc

No value

Anything else

byte[]

如果你使用此 RabbitMQ 连接器(出站连接器)发送对象,则它们将被编码为 JSON 并发送,其中 content_type 设置为 application/json。你可以使用(Vert.x)JSON 对象来接收此有效负载,然后将其映射到你想要的类对象:

If you send objects with this RabbitMQ connector (outbound connector), they are encoded as JSON and sent with content_type set to application/json. You can receive this payload using (Vert.x) JSON Objects, and then map it to the object class you want:

@ApplicationScoped
public static class Generator {

    @Outgoing("to-rabbitmq")
    public Multi<Price> prices() { (1)
        AtomicInteger count = new AtomicInteger();
        return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
                .map(l -> new Price().setPrice(count.incrementAndGet()))
                .onOverflow().drop();
    }

}

@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-rabbitmq")
    public void consume(JsonObject p) { (2)
        Price price = p.mapTo(Price.class); (3)
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}
1 The Price instances are automatically encoded to JSON by the connector
2 You can receive it using a JsonObject
3 Then, you can reconstruct the instance using the mapTo method

mapTo 方法使用 Quarkus Jackson 映射器。请查看 this guide 了解有关映射器配置的详细信息。

The mapTo method uses the Quarkus Jackson mapper. Check this guide to learn more about the mapper configuration.

Acknowledgement

与 RabbitMQ 消息关联的反应式消息消息被确认时,它会通知代理消息已 accepted

When a Reactive Messaging Message associated with a RabbitMQ Message is acknowledged, it informs the broker that the message has been accepted.

是否需要明确确认消息取决于通道的 auto-acknowledgement 设置;如果设置为 true,则你的消息将在收到时自动确认。

Whether you need to explicitly acknowledge the message depends on the auto-acknowledgement setting for the channel; if that is set to true then your message will be automatically acknowledged on receipt.

Failure Management

如果从 RabbitMQ 消息生成的被 nack 的消息,则应用失败策略。RabbitMQ 连接器支持三种策略,由失败策略通道设置控制:

If a message produced from a RabbitMQ message is nacked, a failure strategy is applied. The RabbitMQ connector supports three strategies, controlled by the failure-strategy channel setting:

  • fail - fail the application; no more RabbitMQ messages will be processed. The RabbitMQ message is marked as rejected.

  • accept - this strategy marks the RabbitMQ message as accepted. The processing continues ignoring the failure.

  • reject - this strategy marks the RabbitMQ message as rejected (default). The processing continues with the next message.

Sending RabbitMQ messages

Serialization

在发送 Message<T> 时,连接器将此消息转换为 RabbitMQ 消息。有效负载转换为 RabbitMQ 消息正文。

When sending a Message<T>, the connector converts the message into a RabbitMQ Message. The payload is converted to the RabbitMQ Message body.

T RabbitMQ Message Body

primitive types or UUID/String

String value with content_type set to text/plain

JsonObject or JsonArray

Serialized String payload with content_type set to application/json

io.vertx.mutiny.core.buffer.Buffer

Binary content, with content_type set to application/octet-stream

byte[]

Binary content, with content_type set to application/octet-stream

Any other class

The payload is converted to JSON (using a Json Mapper) then serialized with content_type set to application/json

如果无法将消息有效内容序列化为 JSON,则中断消息 nacked

If the message payload cannot be serialized to JSON, the message is nacked.

Outbound Metadata

在发送 Messages 时,你可以添加 OutgoingRabbitMQMetadata 的一个实例来影响 RabbitMQ 处理消息的方式。例如,你可以配置路由键、时间戳和标头:

When sending Messages, you can add an instance of OutgoingRabbitMQMetadata to influence how the message is handled by RabbitMQ. For example, you can configure the routing key, timestamp and headers:

final OutgoingRabbitMQMetadata metadata = new OutgoingRabbitMQMetadata.Builder()
        .withHeader("my-header", "xyzzy")
        .withRoutingKey("urgent")
        .withTimestamp(ZonedDateTime.now())
        .build();

// Add `metadata` to the metadata of the outgoing message.
return Message.of("Hello", Metadata.of(metadata));

Acknowledgement

默认情况下,在代理确认消息时确认响应式消息传递 Message

By default, the Reactive Messaging Message is acknowledged when the broker acknowledges the message.

Configuring the RabbitMQ Exchange/Queue

你可以使用通道配置上的属性来配置与通道关联的 RabbitMQ 交换器或队列。incoming 通道映射到 RabbitMQ queuesoutgoing 通道映射到 RabbitMQ 交换器。例如:

You can configure the RabbitMQ exchange or queue associated with a channel using properties on the channel configuration. incoming channels are mapped to RabbitMQ queues and outgoing channels are mapped to RabbitMQ exchanges. For example:

mp.messaging.incoming.prices.connector=smallrye-rabbitmq
mp.messaging.incoming.prices.queue.name=my-queue

mp.messaging.outgoing.orders.connector=smallrye-rabbitmq
mp.messaging.outgoing.orders.exchange.name=my-order-queue

如果未设置 exchange.namequeue.name 属性,连接器将使用通道名称。

If the exchange.name or queue.name attribute is not set, the connector uses the channel name.

要使用现有队列,你需要配置 name 并将交换器或队列的 declare 属性设置为 false。例如,如果你有一个使用 people 交换器和队列配置的 RabbitMQ 代理,则需要以下配置:

To use an existing queue, you need to configure the name and set the exchange’s or queue’s declare property to false. For example, if you have a RabbitMQ broker configured with a people exchange and queue, you need the following configuration:

mp.messaging.incoming.people.connector=smallrye-rabbitmq
mp.messaging.incoming.people.queue.name=people
mp.messaging.incoming.people.queue.declare=false

mp.messaging.outgoing.people.connector=smallrye-rabbitmq
mp.messaging.outgoing.people.exchange.name=people
mp.messaging.outgoing.people.exchange.declare=false

Execution model and Blocking processing

响应式消息传递在 I/O 线程中调用你的方法。有关此主题的更多详细信息,请查看 Quarkus Reactive Architecture documentation。但是,你通常需要将响应式消息传递与数据库交互等阻塞式处理结合使用。为此,你需要使用 @Blocking 注解,指示处理为 blocking 且不应该在调用者线程上运行。

Reactive Messaging invokes your method on an I/O thread. See the Quarkus Reactive Architecture documentation for further details on this topic. But, you often need to combine Reactive Messaging with blocking processing such as database interactions. For this, you need to use the @Blocking annotation indicating that the processing is blocking and should not be run on the caller thread.

例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入有效负载存储到数据库:

For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Blocking
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

有 2 个 @Blocking 注释:

There are 2 @Blocking annotations:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们具有相同的效果。因此,可以同时使用。第一个提供更精细的调整,例如要使用的工作器池以及是否保留顺序。第二个也与 Quarkus 的其他响应式特性结合使用,使用默认工作器池并保留顺序。

They have the same effect. Thus, you can use both. The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.

@RunOnVirtualThread

有关在 Java {@s7} 上运行阻塞处理,请参阅 {@s8}。

For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation.

Customizing the underlying RabbitMQ client

连接器在下面使用 Vert.x RabbitMQ 客户端。关于此客户端的更多详细信息,请参见 Vert.x website

The connector uses the Vert.x RabbitMQ client underneath. More details about this client can be found in the Vert.x website.

可以通过生成 RabbitMQOptions 的实例来自定义基础客户端配置,如下所示:

You can customize the underlying client configuration by producing an instance of RabbitMQOptions as follows:

@Produces
@Identifier("my-named-options")
public RabbitMQOptions getNamedOptions() {
  PemKeyCertOptions keycert = new PemKeyCertOptions()
        .addCertPath("./tls/tls.crt")
        .addKeyPath("./tls/tls.key");
  PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
  // You can use the produced options to configure the TLS connection
  return new RabbitMQOptions()
        .setSsl(true)
        .setPemKeyCertOptions(keycert)
        .setPemTrustOptions(trust)
        .setUser("user1")
        .setPassword("password1")
        .setHost("localhost")
        .setPort(5672)
        .setVirtualHost("vhost1")
        .setConnectionTimeout(6000) // in milliseconds
        .setRequestedHeartbeat(60) // in seconds
        .setHandshakeTimeout(6000) // in milliseconds
        .setRequestedChannelMax(5)
        .setNetworkRecoveryInterval(500) // in milliseconds
        .setAutomaticRecoveryEnabled(true);
}

此实例会被检索并用于配置连接器所用的客户端。您需要使用 client-options-name 属性指示客户端的名称:

This instance is retrieved and used to configure the client used by the connector. You need to indicate the name of the client using the client-options-name attribute:

mp.messaging.incoming.prices.client-options-name=my-named-options

Health reporting

如果你使用带有 quarkus-smallrye-health 扩展的 RabbitMQ 连接器,则它有助于准备和运行状况探测。RabbitMQ 连接器报告连接器管理的每个通道的准备和运行状况。

If you use the RabbitMQ connector with the quarkus-smallrye-health extension, it contributes to the readiness and liveness probes. The RabbitMQ connector reports the readiness and liveness of each channel managed by the connector.

要禁用运行状况报告,请将通道的 health-enabled 属性设置为 false。

To disable health reporting, set the health-enabled attribute for the channel to false.

在入站(从 RabbitMQ 接收消息)侧,检查会验证接收器是否已连接到代理。

On the inbound side (receiving messages from RabbitMQ), the check verifies that the receiver is connected to the broker.

在出站(将记录发送到 RabbitMQ)侧,检查会验证发送器未断开与代理的连接;发送器 may 仍处于初始化状态(尚未尝试连接),但将其视为 live/ready。

On the outbound side (sending records to RabbitMQ), the check verifies that the sender is not disconnected from the broker; the sender may still be in an initialised state (connection not yet attempted), but this is regarded as live/ready.

请注意,消息处理失败会标记该消息,然后由 failure-strategy 处理。由 failure-strategy 报告失败并影响检查结果。fail 失败策略会报告失败,因此检查将报告故障。

Note that a message processing failures nacks the message, which is then handled by the failure-strategy. It’s the responsibility of the failure-strategy to report the failure and influence the outcome of the checks. The fail failure strategy reports the failure, and so the check will report the fault.

Dynamic Credentials

Quarkus 和 RabbitMQ 连接器支持 Vault’s RabbitMQ secrets engine 以生成短期动态凭证。这允许 Vault 定期创建和取消 RabbitMQ 凭证。

Quarkus and the RabbitMQ connector support Vault’s RabbitMQ secrets engine for generating short-lived dynamic credentials. This allows Vault to create and retire RabbitMQ credentials on a regular basis.

首先,我们需要启用 Vault 的 rabbitmq 秘密引擎,使用 RabbitMQ 的连接和认证信息对其进行配置,并创建一个 Vault 角色 my-role(用正在运行 RabbitMQ 容器的实际主机替换 10.0.0.3):

First we need to enable Vault’s rabbitmq secret engine, configure it with RabbitMQ’s connection and authentication information, and create a Vault role my-role (replace 10.0.0.3 by the actual host that is running the RabbitMQ container):

vault secrets enable rabbitmq

vault write rabbitmq/config/connection \
    connection_uri=http://10.0.0.3:15672 \
    username=guest \
    password=guest

vault write rabbitmq/roles/my-role \
    vhosts='{"/":{"write": ".*", "read": ".*"}}'

对于此用例,上面配置的用户 guest 需要是具有创建凭证功能的 RabbitMQ 管理员用户。

For this use case, user guest configured above needs to be a RabbitMQ admin user with the capability to create credentials.

然后,我们需要在路径 rabbitmq/creds/my-role 上为 Quarkus 应用程序提供读取功能。

Then we need to give a read capability to the Quarkus application on path rabbitmq/creds/my-role.

cat <<EOF | vault policy write vault-rabbitmq-policy -
path "secret/data/myapps/vault-rabbitmq-test/*" {
  capabilities = ["read"]
}
path "rabbitmq/creds/my-role" {
  capabilities = [ "read" ]
}
EOF

现在 Vault 知道如何在 RabbitMQ 中创建用户,我们需要配置 Quarkus 以使用 RabbitMQ 的凭证提供程序。

Now that Vault knows how to create users in RabbitMQ, we need to configure Quarkus to use a credentials-provider for RabbitMQ.

首先,我们告诉 Quarkus 使用名为 rabbitmq 的凭据提供程序请求动态凭据。

First we tell Quarkus to request dynamic credentials using a credentials-provider named rabbitmq.

quarkus.rabbitmq.credentials-provider = rabbitmq

接下来,我们配置 rabbitmq 凭据提供程序。credentials-role 选项必须设置为我们在 Vault 中创建的角色的名称,在本例中为 my-rolecredentials-mount 选项必须设置为 rabbitmq

Next we configure the rabbitmq credentials provider. The credentials-role option must be set to the name of the role we created in Vault, in our case my-role. The credentials-mount option must be set to rabbitmq.

quarkus.vault.credentials-provider.rabbitmq.credentials-role=my-role
quarkus.vault.credentials-provider.rabbitmq.credentials-mount=rabbitmq

credentials-mount 直接用作 Vault 中机密引擎的装载。这里我们使用 rabbitmq 的默认装载路径。如果 RabbitMQ 机密引擎装载在自定义路径中,则 credentials-mount 选项必须改为该路径。

The credentials-mount is used directly as the mount of the secret engine in Vault. Here we are using the default mount path of rabbitmq. If the RabbitMQ secret engine was mounted at a custom path, the credentials-mount option must be set to that path instead.

RabbitMQ Connector Configuration Reference

Incoming channel configuration

Attribute (alias) Description Mandatory Default

[role="no-hyphens"]username

[role="no-hyphens"](rabbitmq-username)

The username used to authenticate to the broker

Type: string

false

[role="no-hyphens"]password

[role="no-hyphens"](rabbitmq-password)

The password used to authenticate to the broker

Type: string

false

[role="no-hyphens"]host

[role="no-hyphens"](rabbitmq-host)

The broker hostname

Type: string

false

localhost

[role="no-hyphens"]port

[role="no-hyphens"](rabbitmq-port)

The broker port

Type: int

false

5672

[role="no-hyphens"]ssl

[role="no-hyphens"](rabbitmq-ssl)

Whether the connection should use SSL

Type: boolean

false

false

[role="no-hyphens"]trust-all

[role="no-hyphens"](rabbitmq-trust-all)

Whether to skip trust certificate verification

Type: boolean

false

false

[role="no-hyphens"]trust-store-path

[role="no-hyphens"](rabbitmq-trust-store-path)

The path to a JKS trust store

Type: string

false

[role="no-hyphens"]trust-store-password

[role="no-hyphens"](rabbitmq-trust-store-password)

The password of the JKS trust store

Type: string

false

[role="no-hyphens"]credentials-provider-name

[role="no-hyphens"](rabbitmq-credentials-provider-name)

The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client

Type: string

false

[role="no-hyphens"]connection-timeout

The TCP connection timeout (ms); 0 is interpreted as no timeout

Type: int

false

60000

[role="no-hyphens"]handshake-timeout

The AMQP 0-9-1 protocol handshake timeout (ms)

Type: int

false

10000

[role="no-hyphens"]automatic-recovery-enabled

Whether automatic connection recovery is enabled

Type: boolean

false

false

[role="no-hyphens"]automatic-recovery-on-initial-connection

Whether automatic recovery on initial connections is enabled

Type: boolean

false

true

[role="no-hyphens"]reconnect-attempts

[role="no-hyphens"](rabbitmq-reconnect-attempts)

The number of reconnection attempts

Type: int

false

100

[role="no-hyphens"]reconnect-interval

[role="no-hyphens"](rabbitmq-reconnect-interval)

The interval (in seconds) between two reconnection attempts

Type: int

false

10

[role="no-hyphens"]network-recovery-interval

How long (ms) will automatic recovery wait before attempting to reconnect

Type: int

false

5000

[role="no-hyphens"]user

The AMQP username to use when connecting to the broker

Type: string

false

guest

[role="no-hyphens"]include-properties

Whether to include properties when a broker message is passed on the event bus

Type: boolean

false

false

[role="no-hyphens"]requested-channel-max

The initially requested maximum channel number

Type: int

false

2047

[role="no-hyphens"]requested-heartbeat

The initially requested heartbeat interval (seconds), zero for none

Type: int

false

60

[role="no-hyphens"]use-nio

Whether usage of NIO Sockets is enabled

Type: boolean

false

false

[role="no-hyphens"]virtual-host

[role="no-hyphens"](rabbitmq-virtual-host)

The virtual host to use when connecting to the broker

Type: string

false

/

[role="no-hyphens"]exchange.name

The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to "", the default exchange is used

Type: string

false

[role="no-hyphens"]exchange.durable

Whether the exchange is durable

Type: boolean

false

true

[role="no-hyphens"]exchange.auto-delete

Whether the exchange should be deleted after use

Type: boolean

false

false

[role="no-hyphens"]exchange.type

The exchange type: direct, fanout, headers or topic (default)

Type: string

false

topic

[role="no-hyphens"]exchange.declare

Whether to declare the exchange; set to false if the exchange is expected to be set up independently

Type: boolean

false

true

[role="no-hyphens"]tracing.enabled

Whether tracing is enabled (default) or disabled

Type: boolean

false

true

[role="no-hyphens"]tracing.attribute-headers

A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true

Type: string

false

``

[role="no-hyphens"]queue.name

The queue from which messages are consumed.

Type: string

true

[role="no-hyphens"]queue.durable

Whether the queue is durable

Type: boolean

false

true

[role="no-hyphens"]queue.exclusive

Whether the queue is for exclusive use

Type: boolean

false

false

[role="no-hyphens"]queue.auto-delete

Whether the queue should be deleted after use

Type: boolean

false

false

[role="no-hyphens"]queue.declare

Whether to declare the queue and binding; set to false if these are expected to be set up independently

Type: boolean

false

true

[role="no-hyphens"]queue.ttl

If specified, the time (ms) for which a message can remain in the queue undelivered before it is dead

Type: long

false

[role="no-hyphens"]queue.single-active-consumer

If set to true, only one consumer can actively consume messages

Type: boolean

false

false

[role="no-hyphens"]queue.x-queue-type

If automatically declare queue, we can choose different types of queue [quorum, classic, stream]

Type: string

false

classic

[role="no-hyphens"]queue.x-queue-mode

If automatically declare queue, we can choose different modes of queue [lazy, default]

Type: string

false

default

[role="no-hyphens"]max-incoming-internal-queue-size

The maximum size of the incoming internal queue

Type: int

false

[role="no-hyphens"]connection-count

The number of RabbitMQ connections to create for consuming from this queue. This might be necessary to consume from a sharded queue with a single client.

Type: int

false

1

[role="no-hyphens"]auto-bind-dlq

Whether to automatically declare the DLQ and bind it to the binder DLX

Type: boolean

false

false

[role="no-hyphens"]dead-letter-queue-name

The name of the DLQ; if not supplied will default to the queue name with '.dlq' appended

Type: string

false

[role="no-hyphens"]dead-letter-exchange

A DLX to assign to the queue. Relevant only if auto-bind-dlq is true

Type: string

false

DLX

[role="no-hyphens"]dead-letter-exchange-type

The type of the DLX to assign to the queue. Relevant only if auto-bind-dlq is true

Type: string

false

direct

[role="no-hyphens"]dead-letter-routing-key

A dead letter routing key to assign to the queue; if not supplied will default to the queue name

Type: string

false

[role="no-hyphens"]dlx.declare

Whether to declare the dead letter exchange binding. Relevant only if auto-bind-dlq is true; set to false if these are expected to be set up independently

Type: boolean

false

false

[role="no-hyphens"]dead-letter-queue-type

If automatically declare DLQ, we can choose different types of DLQ [quorum, classic, stream]

Type: string

false

classic

[role="no-hyphens"]dead-letter-queue-mode

If automatically declare DLQ, we can choose different modes of DLQ [lazy, default]

Type: string

false

default

[role="no-hyphens"]failure-strategy

The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are fail, accept, reject (default)

Type: string

false

reject

[role="no-hyphens"]broadcast

Whether the received RabbitMQ messages must be dispatched to multiple subscribers

Type: boolean

false

false

[role="no-hyphens"]auto-acknowledgement

Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement

Type: boolean

false

false

[role="no-hyphens"]keep-most-recent

Whether to discard old messages instead of recent ones

Type: boolean

false

false

[role="no-hyphens"]routing-keys

A comma-separated list of routing keys to bind the queue to the exchange

Type: string

false

#

[role="no-hyphens"]content-type-override

Override the content_type attribute of the incoming message, should be a valid MIME type

Type: string

false

[role="no-hyphens"]max-outstanding-messages

The maximum number of outstanding/unacknowledged messages being processed by the connector at a time; must be a positive number

Type: int

false

Outgoing channel configuration

Attribute (alias) Description Mandatory Default

[role="no-hyphens"]automatic-recovery-enabled

Whether automatic connection recovery is enabled

Type: boolean

false

false

[role="no-hyphens"]automatic-recovery-on-initial-connection

Whether automatic recovery on initial connections is enabled

Type: boolean

false

true

[role="no-hyphens"]connection-timeout

The TCP connection timeout (ms); 0 is interpreted as no timeout

Type: int

false

60000

[role="no-hyphens"]default-routing-key

The default routing key to use when sending messages to the exchange

Type: string

false

``

[role="no-hyphens"]default-ttl

If specified, the time (ms) sent messages can remain in queues undelivered before they are dead

Type: long

false

[role="no-hyphens"]exchange.auto-delete

Whether the exchange should be deleted after use

Type: boolean

false

false

[role="no-hyphens"]exchange.declare

Whether to declare the exchange; set to false if the exchange is expected to be set up independently

Type: boolean

false

true

[role="no-hyphens"]exchange.durable

Whether the exchange is durable

Type: boolean

false

true

[role="no-hyphens"]exchange.name

The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to "", the default exchange is used

Type: string

false

[role="no-hyphens"]exchange.type

The exchange type: direct, fanout, headers or topic (default)

Type: string

false

topic

[role="no-hyphens"]handshake-timeout

The AMQP 0-9-1 protocol handshake timeout (ms)

Type: int

false

10000

[role="no-hyphens"]host

[role="no-hyphens"](rabbitmq-host)

The broker hostname

Type: string

false

localhost

[role="no-hyphens"]include-properties

Whether to include properties when a broker message is passed on the event bus

Type: boolean

false

false

[role="no-hyphens"]max-inflight-messages

The maximum number of messages to be written to RabbitMQ concurrently; must be a positive number

Type: long

false

1024

[role="no-hyphens"]max-outgoing-internal-queue-size

The maximum size of the outgoing internal queue

Type: int

false

[role="no-hyphens"]network-recovery-interval

How long (ms) will automatic recovery wait before attempting to reconnect

Type: int

false

5000

[role="no-hyphens"]password

[role="no-hyphens"](rabbitmq-password)

The password used to authenticate to the broker

Type: string

false

[role="no-hyphens"]port

[role="no-hyphens"](rabbitmq-port)

The broker port

Type: int

false

5672

[role="no-hyphens"]reconnect-attempts

[role="no-hyphens"](rabbitmq-reconnect-attempts)

The number of reconnection attempts

Type: int

false

100

[role="no-hyphens"]reconnect-interval

[role="no-hyphens"](rabbitmq-reconnect-interval)

The interval (in seconds) between two reconnection attempts

Type: int

false

10

[role="no-hyphens"]requested-channel-max

The initially requested maximum channel number

Type: int

false

2047

[role="no-hyphens"]requested-heartbeat

The initially requested heartbeat interval (seconds), zero for none

Type: int

false

60

[role="no-hyphens"]ssl

[role="no-hyphens"](rabbitmq-ssl)

Whether the connection should use SSL

Type: boolean

false

false

[role="no-hyphens"]tracing.attribute-headers

A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true

Type: string

false

``

[role="no-hyphens"]tracing.enabled

Whether tracing is enabled (default) or disabled

Type: boolean

false

true

[role="no-hyphens"]trust-all

[role="no-hyphens"](rabbitmq-trust-all)

Whether to skip trust certificate verification

Type: boolean

false

false

[role="no-hyphens"]trust-store-password

[role="no-hyphens"](rabbitmq-trust-store-password)

The password of the JKS trust store

Type: string

false

[role="no-hyphens"]trust-store-path

[role="no-hyphens"](rabbitmq-trust-store-path)

The path to a JKS trust store

Type: string

false

[role="no-hyphens"]credentials-provider-name

[role="no-hyphens"](rabbitmq-credentials-provider-name)

The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client

Type: string

false

[role="no-hyphens"]use-nio

Whether usage of NIO Sockets is enabled

Type: boolean

false

false

[role="no-hyphens"]user

The AMQP username to use when connecting to the broker

Type: string

false

guest

[role="no-hyphens"]username

[role="no-hyphens"](rabbitmq-username)

The username used to authenticate to the broker

Type: string

false

[role="no-hyphens"]virtual-host

[role="no-hyphens"](rabbitmq-virtual-host)

The virtual host to use when connecting to the broker

Type: string

false

/