我更多地使用我非常感兴趣的 spring 集成,但我认为有一种奇怪的行为我找不到答案。

我有一个使用 Queue-Channel 的简单应用程序:

<int:channel id="ticketChannel" datatype="ch.elca.prototype.model.Ticket"> 
    <int:queue capacity="1"/> 
</int:channel> 

我也试过 Rendezvous-Queue 有同样的效果:

<int:channel id="ticketChannel" datatype="ch.elca.prototype.model.Ticket"> 
    <int:rendezvous-queue/> 
</int:channel> 

按照我的理解,现在应该只能在该 channel 内一次移动一条消息了。也许 2,如果你认为你有一个额外的容量。我不确定如何阅读它。但是我可以在不消耗的情况下向该 channel 发送四次,这对我来说有点奇怪,我当时不了解容量。

请参阅以下内容:

主要应用: 在这里,我流式传输 10 张票,并为每张票调用 openTicket:

public static void main(final String[] args) throws InterruptedException { 
    try (ConfigurableApplicationContext context = SpringApplication.run(SassSimulatorApplication2.class, args)) { 
        final TicketGenerator generator = context.getBean(TicketGenerator.class); 
        final ProblemReporter reporter = context.getBean(ProblemReporter.class); 
        generator.createTickets().limit(10).forEach(reporter::openTicket); 
        context.close(); 
    } 
} 

问题报告者:

public class ProblemReporter { 
    private volatile QueueChannel channel; 
 
    public synchronized void openTicket(final Ticket ticket){ 
        final Message<Ticket> build = TicketMessageBuilder.buildMessage(ticket); 
        boolean send = channel.send(build); 
 
        System.out.println("send: " + send); 
        System.out.println("getQueueSize: " + channel.getQueueSize()); 
        System.out.println("getSendCount: " + channel.getSendCount()); 
        System.out.println("getReceiveCount: " + channel.getReceiveCount()); 
        System.out.println("getSendErrorCount: " + channel.getSendErrorCount()); 
        System.out.println("getRemainingCapacity: " + channel.getRemainingCapacity()); 
    } 
 
    @Value("#{ticketChannel}") 
    public void setChannel(final QueueChannel channel) { 
        this.channel = channel; 
    } 
} 

启动应用程序时,我得到以下信息:

send: true 
getQueueSize: 0 
getSendCount: 0 
getReceiveCount: 0 
getSendErrorCount: 0 
getRemainingCapacity: 1 
 
send: true 
getQueueSize: 0 
getSendCount: 0 
getReceiveCount: 0 
getSendErrorCount: 0 
getRemainingCapacity: 1 
 
send: true 
getQueueSize: 1 
getSendCount: 0 
getReceiveCount: 0 
getSendErrorCount: 0 
getRemainingCapacity: 0 
 
send: true 
getQueueSize: 1 
getSendCount: 0 
getReceiveCount: 0 
getSendErrorCount: 0 
getRemainingCapacity: 0 

我正在使用 Spring-Boot 1.3.3,Sprint-Integration 4.2.5.RELEASE。我还尝试了 Spring-Boot 1.2.8 和 Spring-Integration 4.1.9。

这是预期的行为吗???

提前致谢。

请您参考如下方法:

看起来您的 channel.send(build, 30000); 是针对 local 变量而不是共享 bean 完成的。 我的测试用例看起来像:

QueueChannel channel = new QueueChannel(3); 
 
IntStream.range(0, 4) 
        .forEach(i -> { 
            boolean send = channel.send(new GenericMessage<>("test-" + i), 100); 
            System.out.println("send: " + send); 
            System.out.println("getQueueSize: " + channel.getQueueSize()); 
            System.out.println("getRemainingCapacity: " + channel.getRemainingCapacity()); 
        }); 

结果是:

send: true 
getQueueSize: 1 
getRemainingCapacity: 2 
send: true 
getQueueSize: 2 
getRemainingCapacity: 1 
send: true 
getQueueSize: 3 
getRemainingCapacity: 0 
send: false 
getQueueSize: 3 
getRemainingCapacity: 0 

注意:sendCount(以及类似的)只能通过@EnableIntegrationMBeanExport@EnableIntegrationManagement 启用。 参见 Management在引用手册中。

您还可以在框架中找到有关此事的一些测试用例,例如QueueChannelTests .


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!