Inbound Channel Adapters: Polling Multiple Servers and Directories
从 5.0.7 版本开始,提供了 RotatingServerAdvice
;当配置为轮询器建议时,入站适配器可以轮询多个服务器和目录。配置建议并将其添加到轮询器的建议链中,如常。`DelegatingSessionFactory`用于选择服务器,有关详细信息,请参见 Delegating Session Factory。建议配置包含一个 `RotationPolicy.KeyDirectory`对象列表。
Example
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
此建议将在服务器 one
上轮询目录 foo
,直至不存在新文件,然后移至服务器 two
上的目录 bar
,然后移至服务器 two
上的目录 baz
,依此类推。
此默认行为可以通过 fair
构造函数参数进行修改:
fair
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
在这种情况下,无论前一个轮询是否返回文件,此建议都将移至下一个服务器/目录。
或者,你可以提供你自己的 RotationPolicy
以根据需要重新配置消息源:
policy
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
和
custom
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
现在,local-filename-generator-expression
属性(同步器上的 localFilenameGeneratorExpression
)可以包含 #remoteDirectory
变量。这允许从不同目录检索的文件下载到本地类似的目录:
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Sftp.inboundAdapter(sf())
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
使用此建议时,请勿在轮询器上配置 TaskExecutor
;有关更多信息,请参阅 Conditional Pollers for Message Sources。