DSL Extensions

从版本 5.3 开始,引入了 IntegrationFlowExtension 以允许使用自定义或复合 EIP 操作符扩展现有的 Java DSL。所需要做的就是扩展此类,提供可在 IntegrationFlow Bean 定义中使用的方法。扩展类也可用于自定义 IntegrationComponentSpec 配置;例如,可以将丢失或默认选项实现到现有的 IntegrationComponentSpec 扩展中。以下示例演示了复合自定义操作符和对默认自定义 outputProcessor 使用 AggregatorSpec 扩展的情况:

Starting with version 5.3, an IntegrationFlowExtension has been introduced to allow extension of the existing Java DSL with custom or composed EIP-operators. All that is needed is an extension of this class that provides methods which can be used in the IntegrationFlow bean definitions. The extension class can also be used for custom IntegrationComponentSpec configuration; for example, missed or default options can be implemented in the existing IntegrationComponentSpec extension. The sample below demonstrates a composite custom operator and usage of an AggregatorSpec extension for a default custom outputProcessor:

public class CustomIntegrationFlowDefinition
        extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {

    public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
        return split()
                .transform("payload.toUpperCase()");
    }

    public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
        return register(new CustomAggregatorSpec(), aggregator);
    }

}

public class CustomAggregatorSpec extends AggregatorSpec {

    CustomAggregatorSpec() {
        outputProcessor(group ->
                group.getMessages()
                        .stream()
                        .map(Message::getPayload)
                        .map(String.class::cast)
                        .collect(Collectors.joining(", ")));
    }

}

对于方法链流,这些扩展中的新 DSL 操作符必须返回扩展类。这样,目标 IntegrationFlow 定义将使用新的和现有的 DSL 操作符:

For a method chain flow the new DSL operator in these extensions must return the extension class. This way a target IntegrationFlow definition will work with new and existing DSL operators:

@Bean
public IntegrationFlow customFlowDefinition() {
    return
            new CustomIntegrationFlowDefinition()
                    .log()
                    .upperCaseAfterSplit()
                    .channel("innerChannel")
                    .customAggregate(customAggregatorSpec ->
                            customAggregatorSpec.expireGroupsUponCompletion(true))
                    .logAndReply();
}