Annotation Support

除了用于配置消息端点的 XML 命名空间支持之外,还可以使用标注。首先,Spring Integration 提供类级 @MessageEndpoint 作为模式化标注,这意味着它本身使用 Spring 的 @Component 标注进行了标注,因此由 Spring 的组件扫描自动识别为 Bean 定义。

In addition to the XML namespace support for configuring message endpoints, you can also use annotations. First, Spring Integration provides the class-level @MessageEndpoint as a stereotype annotation, meaning that it is itself annotated with Spring’s @Component annotation and is therefore automatically recognized as a bean definition by Spring’s component scanning.

更重要的是各种方法级标注。它们指示标注方法能够处理消息。以下示例演示了类级和方法级标注:

Even more important are the various method-level annotations. They indicate that the annotated method is capable of handling a message. The following example demonstrates both class-level and method-level annotations:

@MessageEndpoint
public class FooService {

    @ServiceActivator
    public void processMessage(Message message) {
        ...
    }
}

对于方法以“handle”Message 来说,其确切含义取决于特定标注。Spring Integration 中可用的标注包括:

Exactly what it means for the method to “handle” the Message depends on the particular annotation. Annotations available in Spring Integration include:

如果您将 XML 配置与注解结合使用,则不需要 @MessageEndpoint 注解。如果您想从 <service-activator/> 元素的 ref 特性的 POJO 引用进行配置,您只需提供方法级的注解。在这种情况下,即使 <service-activator/> 元素上没有方法级特性,注解也能防止歧义。

If you use XML configuration in combination with annotations, the @MessageEndpoint annotation is not required. If you want to configure a POJO reference from the ref attribute of a <service-activator/> element, you can provide only the method-level annotations. In that case, the annotation prevents ambiguity even when no method-level attribute exists on the <service-activator/> element.

在大多数情况下,标注的处理程序方法不应该要求 Message 类型作为其参数。相反,方法参数类型可以匹配消息的有效负载类型,如下例所示:

In most cases, the annotated handler method should not require the Message type as its parameter. Instead, the method parameter type can match the message’s payload type, as the following example shows:

public class ThingService {

    @ServiceActivator
    public void bar(Thing thing) {
        ...
    }

}

当方法参数应该从 MessageHeaders 中的值进行映射时,另一种选择是使用参数级 @Header 标注。通常,使用 Spring Integration 标注标注的方法可以接受 Message 本身、消息有效负载或标头值(使用 @Header)作为参数。事实上,方法可以接受组合,如下例所示:

When the method parameter should be mapped from a value in the MessageHeaders, another option is to use the parameter-level @Header annotation. In general, methods annotated with the Spring Integration annotations can accept the Message itself, the message payload, or a header value (with @Header) as the parameter. In fact, the method can accept a combination, as the following example shows:

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Header("x") int valueX, @Header("y") int valueY) {
        ...
    }

}

您还可以使用 @Headers 标注将所有消息头作为 Map 提供,如下例所示:

You can also use the @Headers annotation to provide all the message headers as a Map, as the following example shows:

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}

此注解的值也可以是 SpEL 表达式(例如,someHeader.toUpperCase()),当您希望在注入标头值之前对其进行操作时,此表达式非常有用。它还提供一个可选的 required 属性,指定标头值是否必须在标头内可用。required 属性的默认值为 true

The value of the annotation can also be a SpEL expression (for example, someHeader.toUpperCase()), which is useful when you wish to manipulate the header value before injecting it. It also provides an optional required property, which specifies whether the attribute value must be available within the headers. The default value for the required property is true.

对于这些标注中的几个,当消息处理方法返回非空值时,端点会尝试发送答复。这与两个配置选项(命名空间和标注)保持一致,即这种端点的输出通道(如果可用)将被使用,而 REPLY_CHANNEL 消息头值则将用作后备方案。

For several of these annotations, when a message-handling method returns a non-null value, the endpoint tries to send a reply. This is consistent across both configuration options (namespace and annotations) in that such an endpoint’s output channel is used (if available), and the REPLY_CHANNEL message header value is used as a fallback.

端点上的输出通道和答复通道消息头的组合启用了一种管道方法,其中多个组件有一个输出通道,而最终组件允许答复报文被转发到答复通道(正如原始请求报文中指定的)。换句话说,最终组件取决于原始发送方提供的信息,结果可以动态地支持任意数量的客户端。这是一个 return address 模式的示例。

The combination of output channels on endpoints and the reply channel message header enables a pipeline approach, where multiple components have an output channel and the final component allows the reply message to be forwarded to the reply channel (as specified in the original request message). In other words, the final component depends on the information provided by the original sender and can dynamically support any number of clients as a result. This is an example of the return address pattern.

除了此处显示的示例之外,这些注释还支持 inputChanneloutputChannel 属性,如下例所示:

In addition to the examples shown here, these annotations also support the inputChannel and outputChannel properties, as the following example shows:

@Service
public class ThingService {

    @ServiceActivator(inputChannel="input", outputChannel="output")
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}

处理这些注释会创建与相应 XML 组件相同的 bean — AbstractEndpoint`实例和 `MessageHandler`实例(或入站通道适配器的 `MessageSource`实例)。请参阅 Annotations on `@Bean Methods。bean 的名称根据以下模式生成:[componentName].[methodName].[decapitalizedAnnotationClassShortName]。在上一个示例中,bean 的名称为 thingService.otherThing.serviceActivator(对于 AbstractEndpoint)和相同名称,以及额外的 .handler.source)后缀(对于 MessageHandler(MessageSource) bean)。可以使用 `@EndpointId`注释与这些消息传递注释一起来自定义此类名称。`MessageHandler`实例(`MessageSource`实例)也有资格被 the message history跟踪。

The processing of these annotations creates the same beans as the corresponding XML components — AbstractEndpoint instances and MessageHandler instances (or MessageSource instances for the inbound channel adapter). See Annotations on @Bean Methods. The bean names are generated from the following pattern: [componentName].[methodName].[decapitalizedAnnotationClassShortName]. In the preceding example the bean name is thingService.otherThing.serviceActivator for the AbstractEndpoint and the same name with an additional .handler (.source) suffix for the MessageHandler (MessageSource) bean. Such a name can be customized using an @EndpointId annotation alongside with these messaging annotations. The MessageHandler instances (MessageSource instances) are also eligible to be tracked by the message history.

从版本 4.0 开始,所有消息传递注释都提供 SmartLifecycle`选项(`autoStartup`和 `phase),以允许在应用程序上下文初始化时对端点生命周期进行控制。它们分别默认为 true`和 `0。要更改端点状态(例如 start()`或 `stop()),您可以使用 BeanFactory(或自动装配)获取对端点 bean 的引用并调用方法。或者,您可以向 Control Bus`发送命令消息(请参阅 Control Bus)。为此,您应该使用前一段中提到的 `beanName

Starting with version 4.0, all messaging annotations provide SmartLifecycle options (autoStartup and phase) to allow endpoint lifecycle control on application context initialization. They default to true and 0, respectively. To change the state of an endpoint (such as start() or stop()), you can obtain a reference to the endpoint bean by using the BeanFactory (or autowiring) and invoke the methods. Alternatively, you can send a command message to the Control Bus (see Control Bus). For these purposes, you should use the beanName mentioned earlier in the preceding paragraph.

在解析上述注释后自动创建的通道(当未配置特定通道 Bean 时)和相应的消费者端点会声明为 Bean,这些 Bean 在上下文初始化结束时会出现。这些 Bean 可以 在其他服务中自动装配,但它们必须标记为 @Lazy 注释,因为在常规自动装配过程中,这些定义通常仍不可用。

Channels automatically created after parsing the mentioned annotations (when no specific channel bean is configured), and the corresponding consumer endpoints, are declared as beans near the end of the context initialization. These beans can be autowired in other services, but they have to be marked with the @Lazy annotation because the definitions, typically, won’t yet be available during normal autowiring processing.

@Autowired
@Lazy
@Qualifier("someChannel")
MessageChannel someChannel;
...

@Bean
Thing1 dependsOnSPCA(@Qualifier("someInboundAdapter") @Lazy SourcePollingChannelAdapter someInboundAdapter) {
    ...
}

从 6.0 版本开始,所有消息注释现在都是 @Repeatable,因此可以在相同服务方法上声明多次相同类型注释,这意味着要创建与重复注释的数量一样的端点:

Starting with version 6.0, all the messaging annotations are @Repeatable now, so several of the same type can be declared on the same service method with the meaning to create as many endpoints as those annotations are repeated:

@Transformer(inputChannel = "inputChannel1", outputChannel = "outputChannel1")
@Transformer(inputChannel = "inputChannel2", outputChannel = "outputChannel2")
public String transform(String input) {
    return input.toUpperCase();
}

Using the @Poller Annotation

在 Spring Integration 4.0 之前,消息注释要求 inputChannel 是对 SubscribableChannel 的引用。对于 PollableChannel 实例,需要使用 <int:bridge/> 元素来配置 <int:poller/>,并使复合端点成为 PollingConsumer。4.0 版本引入了 @Poller 注释,允许直接在消息注释上配置 poller 属性,如下例所示:

Before Spring Integration 4.0, messaging annotations required that the inputChannel be a reference to a SubscribableChannel. For PollableChannel instances, an <int:bridge/> element was needed to configure an <int:poller/> and make the composite endpoint be a PollingConsumer. Version 4.0 introduced the @Poller annotation to allow the configuration of poller attributes directly on the messaging annotations, as the following example shows:

public class AnnotationService {

    @Transformer(inputChannel = "input", outputChannel = "output",
        poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
    public String handle(String payload) {
        ...
    }
}

@Poller 注释仅提供简单的 PollerMetadata 选项。可以使用属性占位符配置 @Poller 注释的属性(maxMessagesPerPollfixedDelayfixedRatecron)。此外,从 5.1 版本开始,PollingConsumerreceiveTimeout 选项也已提供。如果需要提供更多轮询选项(例如 transactionadvice-chainerror-handler 等),应将 PollerMetadata 配置为通用 Bean,并使用其 Bean 名称作为 @Pollervalue 属性。在这种情况下,不允许使用其他属性(它们必须在 PollerMetadata Bean 上指定)。请注意,如果 inputChannelPollableChannel,并且未配置 @Poller,则会使用默认 PollerMetadata(如果它存在于应用程序上下文中)。若要使用 @Configuration 注释声明默认轮询器,请使用与以下示例类似的代码:

The @Poller annotation provides only simple PollerMetadata options. You can configure the @Poller annotation’s attributes (maxMessagesPerPoll, fixedDelay, fixedRate, and cron) with property placeholders. Also, starting with version 5.1, the receiveTimeout option for PollingConsumer s is also provided. If it is necessary to provide more polling options (for example, transaction, advice-chain, error-handler, and others), you should configure the PollerMetadata as a generic bean and use its bean name as the @Poller 's value attribute. In this case, no other attributes are allowed (they must be specified on the PollerMetadata bean). Note, if inputChannel is a PollableChannel and no @Poller is configured, the default PollerMetadata is used (if it is present in the application context). To declare the default poller by using a @Configuration annotation, use code similar to the following example:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
}

以下示例显示如何使用默认轮询器:

The following example shows how to use the default poller:

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
    public String handle(String payload) {
        ...
    }
}

以下示例显示如何使用具名轮询器:

The following example shows how to use a named poller:

@Bean
public PollerMetadata myPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(1000));
    return pollerMetadata;
}

以下示例展示了使用默认轮询器的端点:

The following example shows an endpoint that uses the default poller:

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
                           poller = @Poller("myPoller"))
    public String handle(String payload) {
         ...
    }
}

从版本 4.3.3 开始,@Poller`注释具有 `errorChannel`属性,以便更容易配置基础 `MessagePublishingErrorHandler。此属性在 `<poller>`XML 组件中扮演与 `error-channel`相同的作用。有关更多信息,请参阅 Endpoint Namespace Support

Starting with version 4.3.3, the @Poller annotation has the errorChannel attribute for easier configuration of the underlying MessagePublishingErrorHandler. This attribute plays the same role as error-channel in the <poller> XML component. See Endpoint Namespace Support for more information.

消息注释上的 poller() 属性与 reactive() 属性互斥。有关详细信息,请参见下一节。

The poller() attribute on the messaging annotations is mutually exclusive with the reactive() attribute. See next section for more information.

Using @Reactive Annotation

ReactiveStreamsConsumer 自 5.0 版本以来一直存在,但仅在端点的输入通道是 FluxMessageChannel(或任何 org.reactivestreams.Publisher 实现)时才应用。从 5.3 版本开始,即使输入通道类型,框架也会在其实例创建目标消息处理程序为 ReactiveMessageHandler 时创建其实例。对于从 5.5 版本开始的所有消息注释,都引入了 @Reactive 子注释(与上面提到的 @Poller 类似)。它接受可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> Bean 引用,并且独立于输入通道类型和消息处理程序,将目标端点转换为 ReactiveStreamsConsumer 实例。此函数是从 Flux.transform() 运算符使用的,以在来自输入通道的响应流源上应用一些自定义(publishOn()doOnNext()log()retry() 等)。

The ReactiveStreamsConsumer has been around since version 5.0, but it was applied only when an input channel for the endpoint is a FluxMessageChannel (or any org.reactivestreams.Publisher implementation). Starting with version 5.3, its instance is also created by the framework when the target message handler is a ReactiveMessageHandler independently of the input channel type. The @Reactive sub-annotation (similar to mentioned above @Poller) has been introduced for all the messaging annotations starting with version 5.5. It accepts an optional Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> bean reference and, independently of the input channel type and message handler, turns the target endpoint into the ReactiveStreamsConsumer instance. The function is used from the Flux.transform() operator to apply some customization (publishOn(), doOnNext(), log(), retry() etc.) on a reactive stream source from the input channel.

以下示例演示如何独立于最终订阅者和 DirectChannel 的生成器,从输入通道更改发布线程:

The following example demonstrates how to change the publishing thread from the input channel independently of the final subscriber and producer to that DirectChannel:

@Bean
public Function<Flux<?>, Flux<?>> publishOnCustomizer() {
    return flux -> flux.publishOn(Schedulers.parallel());
}

@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
public void handleReactive(String payload) {
    ...
}

消息传递注释上的 reactive()`属性与 `poller()`属性互斥。有关更多信息,请参阅 Using the `@Poller AnnotationReactive Streams Support

The reactive() attribute on the messaging annotations is mutually exclusive with the poller() attribute. See Using the @Poller Annotation and Reactive Streams Support for more information.

Using the @InboundChannelAdapter Annotation

4.0 版中引入了 @InboundChannelAdapter 方法级注解。它根据标注方法的 MethodInvokingMessageSource 来生成一个 SourcePollingChannelAdapter 集成组件。此注解类似于 <int:inbound-channel-adapter> XML 组件,并具有相同的限制:该方法不能有参数,且返回类型不能为 void。它有两个属性:value(必需的 MessageChannel Bean 名)和 poller(可选的 @Poller 注解,如 described earlier)。如果您需要提供一些 MessageHeaders,可以使用 Message<?> 返回类型,并使用 MessageBuilder 来构建 Message<?>。使用 MessageBuilder 让您可以配置 MessageHeaders。以下示例演示了如何使用 @InboundChannelAdapter 注解:

Version 4.0 introduced the @InboundChannelAdapter method-level annotation. It produces a SourcePollingChannelAdapter integration component based on a MethodInvokingMessageSource for the annotated method. This annotation is an analogue of the <int:inbound-channel-adapter> XML component and has the same restrictions: The method cannot have parameters, and the return type must not be void. It has two attributes: value (the required MessageChannel bean name) and poller (an optional @Poller annotation, as described earlier). If you need to provide some MessageHeaders, use a Message<?> return type and use a MessageBuilder to build the Message<?>. Using a MessageBuilder lets you configure the MessageHeaders. The following example shows how to use an @InboundChannelAdapter annotation:

@InboundChannelAdapter("counterChannel")
public Integer count() {
    return this.counter.incrementAndGet();
}

@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
    return "foo";
}

4.3 版本引入了 channel 别名,用于 value 注释属性,以提供更好的源代码可读性。此外,目标 MessageChannel Bean 是在第一次 receive() 调用期间而不是在初始化阶段由指定的名称(由 outputChannelName 选项设置)在 SourcePollingChannelAdapter 中解析的。它允许使用“延迟绑定”逻辑:从消费者的角度来看,目标 MessageChannel Bean 会在稍晚于 @InboundChannelAdapter 解析阶段创建并注册。

Version 4.3 introduced the channel alias for the value annotation attribute, to provide better source code readability. Also, the target MessageChannel bean is resolved in the SourcePollingChannelAdapter by the provided name (set by the outputChannelName option) on the first receive() call, not during the initialization phase. It allows “late binding” logic: The target MessageChannel bean from the consumer perspective is created and registered a bit later than the @InboundChannelAdapter parsing phase.

第一个示例要求已在应用程序上下文中声明了默认轮询器。

The first example requires that the default poller has been declared elsewhere in the application context.

使用 @MessagingGateway 注释

Using the @MessagingGateway Annotation

Using the @IntegrationComponentScan Annotation

标准 Spring 框架 @ComponentScan 注解不会扫描接口的模式 @Component 注解。为了克服此限制并允许 @MessagingGateway 配置(参见 @MessagingGateway Annotation),我们引入了 @IntegrationComponentScan 机制。此注解必须与 @Configuration 注解一起放置,并定制以定义其扫描选项,例如 basePackagesbasePackageClasses。在这种情况下,所有带有 @MessagingGateway 标注的发现的接口都将被解析并注册为 GatewayProxyFactoryBean 实例。所有其他基于类的组件都将由标准 @ComponentScan 解析。

The standard Spring Framework @ComponentScan annotation does not scan interfaces for stereotype @Component annotations. To overcome this limitation and allow the configuration of @MessagingGateway (see @MessagingGateway Annotation), we introduced the @IntegrationComponentScan mechanism. This annotation must be placed with a @Configuration annotation and be customized to define its scanning options, such as basePackages and basePackageClasses. In this case, all discovered interfaces annotated with @MessagingGateway are parsed and registered as GatewayProxyFactoryBean instances. All other class-based components are parsed by the standard @ComponentScan.