Reactive Messaging AMQP 1.0 Connector Reference Documentation

本指南是 Getting Started with AMQP 1.0 的配套指南。它更详细地解释了 AMQP 连接器的配置和使用,用于响应式消息传递。

本文档并未涵盖连接器的所有详细信息。有关更多详细信息,请参阅 SmallRye Reactive Messaging website

AMQP 连接器允许 Quarkus 应用程序使用 AMQP 1.0 协议发送和接收消息。有关该协议的更多详细信息,请参阅 the AMQP 1.0 specification。请务必注意,AMQP 1.0 和 AMQP 0.9.1(RabbitMQ 实现)不兼容。查看 Using RabbitMQ 以了解详细信息。

AMQP connector extension

若要使用连接器,你需要添加 quarkus-messaging-amqp 扩展。

你可以使用以下方式将扩展添加到你的项目:

CLI
quarkus extension add {add-extension-extensions}
Maven
./mvnw quarkus:add-extension -Dextensions='{add-extension-extensions}'
Gradle
./gradlew addExtension --extensions='{add-extension-extensions}'

或者只需将以下依赖项添加到你的项目中:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-amqp</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-amqp")

添加到您的项目后,您可以通过配置 connector 属性,将 channels 映射到 AMQP 地址:

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
Connector auto-attachment

如果类路径中只有一个连接器,则可以省略 connector 属性配置。Quarkus 自动将 orphan 通道与类路径上找到的(唯一的)连接器关联起来。Orphans 通道是无下游使用者的传出通道或无上游生产者的传入通道。 可以使用以下方法禁用此自动附加功能:

quarkus.messaging.auto-connector-attachment=false

Configuring the AMQP Broker access

AMQP 连接器连接到 AMQP 1.0 代理,例如 Apache ActiveMQ 或 Artemis。若要配置代理的位置和凭证,请在 application.properties 中添加以下属性:

amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)

mp.messaging.incoming.prices.connector=smallrye-amqp (5)
1 配置代理/路由器主机名。你可以针对每个通道(使用 host 属性)或全局(使用 amqp-host 属性)配置。
2 配置代理/路由器端口。你可以针对每个通道(使用 port 属性)或全局(使用 amqp-port 属性)配置。默认为 5672
3 如果需要,配置代理/路由器用户名。你可以针对每个通道(使用 username 属性)或全局(使用 amqp-username 属性)配置。
4 如果需要,配置代理/路由器密码。你可以针对每个通道(使用 password 属性)或全局(使用 amqp-password 属性)配置。
5 指示 AMQP 连接器管理价格通道

在开发模式和运行测试时, Dev Services for AMQP 自动启动 AMQP 代理。

Receiving AMQP messages

假设你的应用程序接收了 Message<Double> 。你可以直接使用有效负载:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class AmqpPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

或者,你可以检索 Message<Double>:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class AmqpPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message, marking the AMQP message as `accepted`.
        return price.ack();
    }

}

Inbound Metadata

来自 AMQP 的消息在元数据中包含一个 IncomingAmqpMetadata 实例。

Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
    String address = meta.getAddress();
    String subject = meta.getSubject();
    boolean durable = meta.isDurable();
    // Use io.vertx.core.json.JsonObject
    JsonObject properties = meta.getProperties();
    // ...
});

Deserialization

连接器将传入的 AMQP 消息转换为 Reactive Messaging Message<T> 实例。 T 取决于接收的 AMQP 消息的 body

AMQP Type System 定义了受支持的类型。

AMQP Body Type <T>

包含 AMQP Primitive Type 的 AMQP 值

the corresponding Java type

使用 Binary 类型的 AMQP 值

byte[]

AMQP Sequence

List

AMQP 数据(带有二进制内容),并且 content-type 设置为 application/json

JsonObject

带有不同 content-type 的 AMQP 数据

byte[]

如果你使用此 AMQP 连接器(出站连接器)发送对象,该对象将被编码为 JSON 并作为二进制数据发送。content-type 设置为 application/json。因此,你可以按以下方式重建对象:

import io.vertx.core.json.JsonObject;
//
@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-amqp") (1)
    public void consume(JsonObject p) {   (2)
        Price price = p.mapTo(Price.class);  (3)
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}
1 Price 实例将由连接器自动编码为 JSON
2 你可以使用 JsonObject 接收它
3 然后,你可以使用 mapTo 方法重建该实例

mapTo 方法使用 Quarkus Jackson 映射器。请查看 this guide 了解有关映射器配置的详细信息。

Acknowledgement

当与 AMQP 消息关联的 Reactive Messaging 消息得到确认时,它会通知代理此消息已 accepted

Failure Management

如果来自 AMQP 消息的消息 nacked,则会应用故障策略。AMQP 连接器支持六种策略:

  • fail - 使应用失败;将不会再处理任何 AMQP 消息(默认)。AMQP 消息标记为已拒绝。

  • accept - 此策略将 AMQP 消息标记为 accepted。处理将继续,忽略故障。参见 accepted delivery state documentation

  • release - 此策略将 AMQP 消息标记为 released。处理将继续执行下一条消息。代理可以重新发送消息。参见 released delivery state documentation

  • reject - 此策略将 AMQP 消息标记为已拒绝。处理将继续执行下一条消息。参见 rejected delivery state documentation

  • modified-failed - 此策略将 AMQP 消息标记为 modified,并指示其失败(具有 delivery-failed 属性)。处理将继续执行下一条消息,但代理可以尝试重新发送消息。参见 modified delivery state documentation

  • modified-failed-undeliverable-here - 此策略将 AMQP 消息标记为 modified,并指示其失败(具有 delivery-failed 属性)。它还指示该应用无法处理此消息,这意味着代理不会尝试将消息重新发送到此节点。处理将继续执行下一条消息。参见 modified delivery state documentation

Sending AMQP messages

Serialization

发送 Message<T> 时,连接器会将该消息转换为 AMQP 消息。负载被转换为 AMQP 消息 body

T AMQP Message Body

primitive types or String

带有负载的 AMQP 值

Instant or UUID

使用对应 AMQP 类型的 AMQP 值

JsonObject or JsonArray

使用二进制内容的 AMQP 数据。content-type 设置为 application/json

io.vertx.mutiny.core.buffer.Buffer

使用二进制内容的 AMQP 数据。未设置 content-type

Any other class

有效内容转换为 JSON(使用 Json 映射器)。结果使用 binary 内容封装到 AMQP 数据中。将 content-type 设置为 application/json

如果无法将消息有效内容序列化为 JSON,则中断消息 nacked

Outbound Metadata

发送 Messages 时,可以添加 OutgoingAmqpMetadata 实例以影响将消息发送到 AMQP的方式。例如,您可以配置主题和属性:

 OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withDurable(true)
    .withSubject("my-subject")
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

Dynamic address names

有时,按需选择消息的目标是可取的。在这种情况下,不应在应用配置文档中配置地址,而应改用发送元数据来设置地址。

例如,可以根据传入消息发送到动态地址:

String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withAddress(addressName)
    .withDurable(true)
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

为了能够根据每条消息设置地址,连接器正在使用 anonymous sender

Acknowledgement

默认情况下,Reactive Messaging Message 在代理确认消息后确认。使用路由器时,可能无法启用此确认。在这种情况下,配置 auto-acknowledgement 属性以在将消息发送到路由器后立即确认消息。

如果 AMQP 消息被代理拒绝/释放/修改(或无法成功发送),则消息会被拒绝。

Back Pressure and Credits

反压由 AMQP credits 处理。发送连接器仅请求允许授信的量。当授信量达到 0 时,它会一直等待(以非阻塞方式),直到代理向 AMQP 发送方授予更多授信。

Configuring the AMQP address

可以使用 address 属性配置 AMQP 地址:

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=my-queue

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.outgoing.orders.address=my-order-queue

如果未设置 address 属性,则连接器将使用通道名称。

为了使用现有队列,需要配置 addresscontainer-id 以及(可选)link-name 属性。例如,如果配置了 Apache Artemis 代理:

<queues>
    <queue name="people">
        <address>people</address>
        <durable>true</durable>
        <user>artemis</user>
    </queue>
</queues>

您需要以下配置:

mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people

如果队列名称不是通道名称,可能需要配置 link-name 属性:

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people

为了使用 MULTICAST 队列,需要提供 FQQN(完全限定的队列名称),而不仅仅是队列名称:

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo

mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people

有关 AMQP 地址模型的更多详细信息,请参见 Artemis documentation

Execution model and Blocking processing

响应式消息传递在 I/O 线程中调用你的方法。有关此主题的更多详细信息,请查看 Quarkus Reactive Architecture documentation。但是,你通常需要将响应式消息传递与数据库交互等阻塞式处理结合使用。为此,你需要使用 @Blocking 注解,指示处理为 blocking 且不应该在调用者线程上运行。

例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入有效负载存储到数据库:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

有 2 个 @Blocking 注释:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们具有相同的效果。因此,可以同时使用。第一个提供更精细的调整,例如要使用的工作器池以及是否保留顺序。第二个也与 Quarkus 的其他响应式特性结合使用,使用默认工作器池并保留顺序。

@RunOnVirtualThread

有关在 Java {@s7} 上运行阻塞处理,请参阅 {@s8}。

@Transactional

如果用 {@s9} 对您的方法进行了注解,则它将自动被视为`{@s11}`,即使没有用 {@s10} 对方法进行注解。

Customizing the underlying AMQP client

连接器在底层使用 Vert.x AMQP 客户端。有关此客户端的更多详细信息,请参见 Vert.x website

可以通过生成一个 AmqpClientOptions 实例来定制底层客户端配置,如下所示:

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // You can use the produced options to configure the TLS connection
  PemKeyCertOptions keycert = new PemKeyCertOptions()
    .addCertPath("./tls/tls.crt")
    .addKeyPath("./tls/tls.key");
  PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
  return new AmqpClientOptions()
        .setSsl(true)
        .setPemKeyCertOptions(keycert)
        .setPemTrustOptions(trust)
        .addEnabledSaslMechanism("EXTERNAL")
        .setHostnameVerificationAlgorithm("") // Disables the hostname verification. Defaults is "HTTPS"
        .setConnectTimeout(30000)
        .setReconnectInterval(5000)
        .setContainerId("my-container");
}

此实例会被检索并用于配置连接器所用的客户端。您需要使用 client-options-name 属性指示客户端的名称:

mp.messaging.incoming.prices.client-options-name=my-named-options

如果您经常与代理断开连接,也可以使用 AmqpClientOptions 在需要永久保持 AMQP 连接时设置心跳。某些代理可能在一定空闲超时后终止 AMQP 连接。您可以提供一个心跳值,Vert.x Proton 客户端将在向远程对等方打开传输内容时使用该值来通告空闲超时。

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // set a heartbeat of 30s (in milliseconds)
  return new AmqpClientOptions()
        .setHeartbeat(30000);
}

Health reporting

如果您对带有 quarkus-smallrye-health 扩展的 AMQP 连接器使用该扩展,那么它将有助于就绪性和健康检查。AMQP 连接器会报告连接器所管理的每个通道的就绪性和健康状况。目前,AMQP 连接器对就绪性和健康检查使用相同的逻辑。

要禁用运行状况报告,请将通道的 health-enabled 属性设置为 false。在入站端(从 AMQP 接收消息),检查将验证接收器是否已附加到代理。在出站端(向 AMQP 发送记录),检查将验证发送器是否已附加到代理。

请注意,消息处理失败会对消息发出否定确认,然后由 failure-strategy 处理该消息。failure-strategy 负责报告失败并影响检查结果。fail 失败策略会报告失败,因此检查会报告故障。

Using RabbitMQ

此连接器适用于 AMQP 1.0。RabbitMQ 实现 AMQP 0.9.1。RabbitMQ 默认不提供 AMQP 1.0,但有一个可用于该版本的插件。要将 RabbitMQ 与此连接器配合使用,请启用并配置 AMQP 1.0 插件。

尽管存在该插件,但一些 AMQP 1.0 功能无法与 RabbitMQ 一起使用。因此,我们推荐以下配置。

从 RabbitMQ 接收消息:

  • Set durable to false

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false

向 RabbitMQ 发送消息:

  • 设置目标地址(不支持匿名发件人)

  • set use-anonymous-sender to false

mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false

因此,在使用 RabbitMQ 时无法动态更改目标(使用消息元数据)。

Receiving Cloud Events

AMQP 连接器支持 Cloud Events。当连接器检测到 structuredbinary Cloud 事件时,它会在 Message 的元数据中添加一个 IncomingCloudEventMetadata<T>.IncomingCloudEventMetadata 包含对强制和可选 Cloud 事件属性的访问器。

如果连接器无法提取 Cloud 事件元数据,它将在没有元数据的情况下发送消息。

有关接收 Cloud 事件的更多信息,请参见 SmallRye Reactive Messaging 文档中的 Receiving Cloud Events

Sending Cloud Events

AMQP 连接器支持 Cloud Events。如果满足以下条件,连接器将以 Cloud 事件的形式发送出站记录:

  • 消息元数据包含一个 io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata 实例,

  • 通道配置定义了 cloud-events-typecloud-events-source 属性。

有关发送 Cloud 事件的更多信息,请参见 SmallRye Reactive Messaging 文档中的 Sending Cloud Events

AMQP Connector Configuration Reference

Quarkus specific configuration

Unresolved include directive in modules/ROOT/pages/amqp-reference.adoc - include::../../../target/quarkus-generated-doc/config/quarkus-messaging-amqp.adoc[]

Incoming channel configuration

Attribute (alias) Description Mandatory Default

[role="no-hyphens"]address

AMQP 地址。如果未设置,将使用通道名称类型:string

false

[role="no-hyphens"]auto-acknowledgement

在接收到 AMQP 消息时是否必须对其进行确认类型:boolean

false

false

[role="no-hyphens"]broadcast

接收到的 AMQP 消息是否必须分派给多个 subscribers_类型:_boolean

false

false

[role="no-hyphens"]capabilities

发送方或接收方客户端提出的功能的逗号分隔列表。类型:string

false

[role="no-hyphens"]client-options-name

[role="no-hyphens"](amqp-client-options-name)

用于自定义 AMQP 客户端配置的 AMQP 客户端选项 bean 的名称类型:string

false

[role="no-hyphens"]cloud-events

启用(默认)或禁用 Cloud Event 支持。如果在 incoming 通道上启用,则连接器将分析传入记录并尝试创建 Cloud Event 元数据。如果在 outgoing 上启用,则连接器将消息包括 Cloud Event 元数据的情况下发送为 Cloud Event。类型: boolean

false

true

[role="no-hyphens"]connect-timeout

[role="no-hyphens"](amqp-connect-timeout)

连接超时(以毫秒为单位)类型:int

false

1000

[role="no-hyphens"]container-id

AMQP 容器 ID 类型:string

false

[role="no-hyphens"]durable

AMQP 订阅是否持久类型:boolean

false

false

[role="no-hyphens"]failure-strategy

指定在从 AMQP 消息生成的消息拒绝时应用的故障策略。可接受的值为 fail(默认值)、acceptreleaserejectmodified-failed、`modified-failed-undeliverable-here`类型:string

false

fail

[role="no-hyphens"]health-timeout

确定与代理的连接是否仍为可用性检查而建立的最大等待时间(以秒为单位)。经过该阈值后,该检查将被视为失败。类型:int

false

3

[role="no-hyphens"]host

[role="no-hyphens"](amqp-host)

The broker hostname

Type: string

false

localhost

[role="no-hyphens"]link-name

链接的名称。如果未设置,将使用通道名称。类型:string

false

[role="no-hyphens"]password

[role="no-hyphens"](amqp-password)

用于对代理进行身份验证的密码类型:string

false

[role="no-hyphens"]port

[role="no-hyphens"](amqp-port)

The broker port

Type: int

false

5672

[role="no-hyphens"]reconnect-attempts

[role="no-hyphens"](amqp-reconnect-attempts)

重新连接尝试的次数类型:int

false

100

[role="no-hyphens"]reconnect-interval

[role="no-hyphens"](amqp-reconnect-interval)

两次重新连接尝试之间的间隔时间(以秒为单位)类型:int

false

10

[role="no-hyphens"]sni-server-name

[role="no-hyphens"](amqp-sni-server-name)

如果设置,则显式覆盖用于 TLS SNI 服务器名的主机名类型:string

false

[role="no-hyphens"]selector

设置消息选择器。此属性用于在源终结点上定义 apache.org:selector-filter:string 筛选器,使用基于 SQL 的语法请求服务器筛选将哪些消息传递给接收方(如果相关服务器支持)。支持的精确功能和所需的语法可能因服务器而异。类型:string

false

[role="no-hyphens"]tracing-enabled

启用(默认)或禁用跟踪类型: boolean

false

true

[role="no-hyphens"]use-ssl

[role="no-hyphens"](amqp-use-ssl)

AMQP 连接是否使用 SSL/TLS 类型:boolean

false

false

[role="no-hyphens"]username

[role="no-hyphens"](amqp-username)

用于对代理进行身份验证的用户名类型:string

false

[role="no-hyphens"]virtual-host

[role="no-hyphens"](amqp-virtual-host)

如果设置,则配置用于连接 AMQP 打开帧和 TLS SNI 服务器名(如果使用 TLS)的主机名值类型:string

false

Outgoing channel configuration

Attribute (alias) Description Mandatory Default

[role="no-hyphens"]address

AMQP 地址。如果未设置,将使用通道名称类型:string

false

[role="no-hyphens"]capabilities

发送方或接收方客户端提出的功能的逗号分隔列表。类型:string

false

[role="no-hyphens"]client-options-name

[role="no-hyphens"](amqp-client-options-name)

用于自定义 AMQP 客户端配置的 AMQP 客户端选项 bean 的名称类型:string

false

[role="no-hyphens"]cloud-events

启用(默认)或禁用 Cloud Event 支持。如果在 incoming 通道上启用,则连接器将分析传入记录并尝试创建 Cloud Event 元数据。如果在 outgoing 上启用,则连接器将消息包括 Cloud Event 元数据的情况下发送为 Cloud Event。类型: boolean

false

true

[role="no-hyphens"]cloud-events-data-content-type

[role="no-hyphens"](cloud-events-default-data-content-type)

配置传出 Cloud 事件的默认 datacontenttype 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 datacontenttype 属性,则使用此值。类型: string

false

[role="no-hyphens"]cloud-events-data-schema

[role="no-hyphens"](cloud-events-default-data-schema)

配置传出 Cloud 事件的默认 dataschema 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 dataschema 属性,则使用此值。类型: string

false

[role="no-hyphens"]cloud-events-insert-timestamp

[role="no-hyphens"](cloud-events-default-timestamp)

是否让连接器自动将 time 属性插入到传出 Cloud 事件中。需要将 cloud-events 设置为 true。如果消息本身未配置 time 属性,则使用此值。类型: boolean

false

true

[role="no-hyphens"]cloud-events-mode

Cloud 事件模式( structuredbinary (默认值))。表示如何将 Cloud 事件写入到传出记录中。类型: string

false

binary

[role="no-hyphens"]cloud-events-source

[role="no-hyphens"](cloud-events-default-source)

配置传出 Cloud 事件的默认 source 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 source 属性,则使用此值。类型: string

false

[role="no-hyphens"]cloud-events-subject

[role="no-hyphens"](cloud-events-default-subject)

配置传出 Cloud 事件的默认 subject 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 subject 属性,则使用此值。类型: string

false

[role="no-hyphens"]cloud-events-type

[role="no-hyphens"](cloud-events-default-type)

配置传出 Cloud 事件的默认 type 属性。需要将 cloud-events 设置为 true。如果消息本身未配置 type 属性,则使用此值。类型: string

false

[role="no-hyphens"]connect-timeout

[role="no-hyphens"](amqp-connect-timeout)

连接超时(以毫秒为单位)类型:int

false

1000

[role="no-hyphens"]container-id

AMQP 容器 ID 类型:string

false

[role="no-hyphens"]credit-retrieval-period

经纪人授予的信用之间的重试间隔(以毫秒为单位)。当发送方用完信用时使用此时间。类型:int

false

2000

[role="no-hyphens"]durable

是否将发送的 AMQP 消息标记为持久性类型:boolean

false

false

[role="no-hyphens"]health-timeout

确定与代理的连接是否仍为可用性检查而建立的最大等待时间(以秒为单位)。经过该阈值后,该检查将被视为失败。类型:int

false

3

[role="no-hyphens"]host

[role="no-hyphens"](amqp-host)

The broker hostname

Type: string

false

localhost

[role="no-hyphens"]link-name

链接的名称。如果未设置,将使用通道名称。类型:string

false

[role="no-hyphens"]merge

连接器是否应该允许多个上游类型:“3”

false

false

[role="no-hyphens"]password

[role="no-hyphens"](amqp-password)

用于对代理进行身份验证的密码类型:string

false

[role="no-hyphens"]port

[role="no-hyphens"](amqp-port)

The broker port

Type: int

false

5672

[role="no-hyphens"]reconnect-attempts

[role="no-hyphens"](amqp-reconnect-attempts)

重新连接尝试的次数类型:int

false

100

[role="no-hyphens"]reconnect-interval

[role="no-hyphens"](amqp-reconnect-interval)

两次重新连接尝试之间的间隔时间(以秒为单位)类型:int

false

10

[role="no-hyphens"]sni-server-name

[role="no-hyphens"](amqp-sni-server-name)

如果设置,则显式覆盖用于 TLS SNI 服务器名的主机名类型:string

false

[role="no-hyphens"]tracing-enabled

启用(默认)或禁用跟踪类型: boolean

false

true

[role="no-hyphens"]ttl

发送的 AMQP 消息的生存时间。0 表示禁用生存时间类型:long

false

0

[role="no-hyphens"]use-anonymous-sender

连接器是否应使用匿名发送器。如果经纪人支持,则默认值是 true,否则是 false。如果不支持,则无法动态更改目标地址。类型:boolean

false

[role="no-hyphens"]use-ssl

[role="no-hyphens"](amqp-use-ssl)

AMQP 连接是否使用 SSL/TLS 类型:boolean

false

false

[role="no-hyphens"]username

[role="no-hyphens"](amqp-username)

用于对代理进行身份验证的用户名类型:string

false

[role="no-hyphens"]virtual-host

[role="no-hyphens"](amqp-virtual-host)

如果设置,则配置用于连接 AMQP 打开帧和 TLS SNI 服务器名(如果使用 TLS)的主机名值类型:string

false

Conditionally configure channels

您可以使用特定配置文件配置通道。因此,仅当启用了指定配置文件时才配置通道(并将它们添加到应用程序中)。

要实现此目的,您需要做以下操作:

  1. 使用 %my-profile`为 `mp.messaging.[incoming|outgoing].$channel`条目添加前缀,例如: `%my-profile.mp.messaging.[incoming|outgoing].$channel.key=value

  2. 可以使用 @Incoming(channel)@Outgoing(channel) 注解来启用 CDI bean 中的 @IfBuildProfile("my-profile"),但仅当启用配置文件时才需要启用它。

请注意,响应性消息传递会验证图形是否完整。因此,在使用这样的条件配置时,请确保应用程序可与已启用和未启用的配置文件配合使用。

请注意,此方法还可以根据配置文件来更改通道配置。