Inbound Channel Adapters: Polling Multiple Servers and Directories

从 5.0.7 版本开始,提供了 RotatingServerAdvice;当配置为轮询器建议时,入站适配器可以轮询多个服务器和目录。配置建议并将其添加到轮询器的建议链中,如常。`DelegatingSessionFactory`用于选择服务器,有关详细信息,请参见 Delegating Session Factory。建议配置包含一个 `RotationPolicy.KeyDirectory`对象列表。

Starting with version 5.0.7, the RotatingServerAdvice is available; when configured as a poller advice, the inbound adapters can poll multiple servers and directories. Configure the advice and add it to the poller’s advice chain as normal. A DelegatingSessionFactory is used to select the server see Delegating Session Factory for more information. The advice configuration consists of a list of RotationPolicy.KeyDirectory objects.

Example
@Bean
public RotatingServerAdvice advice() {
    List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
    return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

此建议将在服务器 one 上轮询目录 foo,直至不存在新文件,然后移至服务器 two 上的目录 bar,然后移至服务器 two 上的目录 baz,依此类推。

This advice will poll directory foo on server one until no new files exist then move to directory bar and then directory baz on server two, etc.

此默认行为可以通过 fair 构造函数参数进行修改:

This default behavior can be modified with the fair constructor arg:

fair
@Bean
public RotatingServerAdvice advice() {
    ...
    return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}

在这种情况下,无论前一个轮询是否返回文件,此建议都将移至下一个服务器/目录。

In this case, the advice will move to the next server/directory regardless of whether the previous poll returned a file.

或者,你可以提供你自己的 RotationPolicy 以根据需要重新配置消息源:

Alternatively, you can provide your own RotationPolicy to reconfigure the message source as needed:

policy
public interface RotationPolicy {

    void beforeReceive(MessageSource<?> source);

    void afterReceive(boolean messageReceived, MessageSource<?> source);

}

and

custom
@Bean
public RotatingServerAdvice advice() {
    return new RotatingServerAdvice(myRotationPolicy());
}

现在,local-filename-generator-expression 属性(同步器上的 localFilenameGeneratorExpression)可以包含 #remoteDirectory 变量。这允许从不同目录检索的文件下载到本地类似的目录:

The local-filename-generator-expression attribute (localFilenameGeneratorExpression on the synchronizer) can now contain the #remoteDirectory variable. This allows files retrieved from different directories to be downloaded to similar directories locally:

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(Sftp.inboundAdapter(sf())
                    .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                    .localDirectory(new File(tmpDir))
                    .localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
                    .remoteDirectory("."),
                e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
            .channel(MessageChannels.queue("files"))
            .get();
}

使用此建议时,请勿在轮询器上配置 TaskExecutor;有关更多信息,请参阅 Conditional Pollers for Message Sources

Do not configure a TaskExecutor on the poller when using this advice; see Conditional Pollers for Message Sources for more information.