我是 RxJava 的新手,正在尝试从支持无损背压的 RabbitMQ 队列中实现消息的 Observable。我设法从 Spring AMQP MessageListener 创建了一个 Observable。这可以在同步环境中很好地处理背压(例如 callstack blocking ),但是一旦引入多线程,背压就会消失——正如您所期望的那样。类(class)如下:

import org.springframework.amqp.core.MessageListener; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.stereotype.Component; 
import rx.Observable; 
import rx.subscriptions.Subscriptions; 
 
import javax.inject.Inject; 
 
@Component 
public class CommandExchange { 
    private final MessageConverter messageConverter; 
    private final ConnectionFactory connectionFactory; 
 
    @Inject 
    public CommandExchange(MessageConverter messageConverter, ConnectionFactory connectionFactory) { 
        this.messageConverter = messageConverter; 
        this.connectionFactory = connectionFactory; 
    } 
 
    public <T extends Command> Observable<T> observeQueue(String... queueNames) { 
        return Observable.create(subscriber -> { 
 
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
            container.setConnectionFactory(connectionFactory); 
            container.setQueueNames(queueNames); 
            container.setMessageListener((MessageListener) message -> { 
                T command = (T) messageConverter.fromMessage(message); 
                if (!subscriber.isUnsubscribed()) { 
                    System.out.println("Being asked for a message."); 
                    subscriber.onNext(command); 
                } 
            }); 
            container.start(); 
 
            Subscriptions.create(container::shutdown); 
 
        }); 
    } 
} 

我不知道如何在没有阻塞或缓冲的情况下实现无损背压。使用缓冲是没有意义的,因为 Rabbit MQ 队列已经是一个缓冲区——所以只有当订阅者准备好接收消息时,才应该从队列中使用消息。解决方案是使用基于拉的可观察对象(即停止使用监听器,而是使用 grab a message when there is demand from the subscriber )?如果是这样,处理队列中当前没有消息的情况的最佳做法是什么?

请您参考如下方法:

是的,我会停止使用监听器并按需从队列中获取消息。如果你使用

Observable.create(new SyncOnSubscribe<T>() {...}); 

SyncOnSubscribe 中,您或多或少只是指定了获取一条消息所采取的操作(如果没有等待,则不指定)。


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!