IT序号网

ActiveMQ消息的消费原理知识解答

flyfish 2021年06月07日 大数据 152 0

消费端消费消息:

  在 初识ActiveMQ 中我提到过,两种方法可以接收消息,一种是使用同步阻塞的ActiveMQMessageConsumer#receive方法。另一种是使用消息监听器MessageListener。这里需要注意的是,在同一个session下,这两者不能同时工作,也就是说不能针对不同消息采用不同的接收方式。否则会抛出异常。至于为什么这么做,最大的原因还是在事务性会话中,两种消费模式的事务不好管控。

  先通过ActiveMQMessageConsumer#receive 方法来对消息的接受一探究竟:

public Message receive() throws JMSException { 
        checkClosed(); 
        //检查receive和MessageListener是否同时配置在当前的会话中,有则抛出异常 
        checkMessageListener(); 
        //如果PrefetchSizeSize为0并且unconsumerMessage为空,则发起pull命令 
        sendPullCommand(0); 
        MessageDispatch md = dequeue(-1);//出列,获取消息 
        if (md == null) { 
            return null; 
        } 
        beforeMessageIsConsumed(md); 
        //发送ack给到broker 
        afterMessageIsConsumed(md, false); 
        //获取消息并返回 
        return createActiveMQMessage(md); 
    } 

  下面简单的说一下以上几个核心方法中做了什么不为人知的事:

  sendPullCommand(0) :发送pull命令从broker上获取消息,前提是prefetchSize=0并且unconsumedMessages为空。unconsumedMessage表示未消费的消息,这里面预读取的消息大小为prefetchSize的值

protected void sendPullCommand(long timeout) throws JMSException { 
        clearDeliveredList(); 
        if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { 
            MessagePull messagePull = new MessagePull(); 
            messagePull.configure(info); 
            messagePull.setTimeout(timeout); 
            //向服务端异步发送messagePull指令 
            session.asyncSendPacket(messagePull); 
        } 
    } 

  这里发送异步消息跟消息生产的原理是一样的。通过包装链去调用 Sokect 发送请求。

  clearDeliveredList():

  在上面的sendPullCommand方法中,会先调用clearDeliveredList方法,主要用来清理已经分发的消息链表deliveredMessages,存储分发给消费者但还为应答的消息链表

    Ø 如果session是事务的,则会遍历deliveredMessage中的消息放入到previouslyDeliveredMessage中来做重发
    Ø 如果session是非事务的,根据ACK的模式来选择不同的应答操作

  这是个同步的过程:

    private void clearDeliveredList() { 
        if (clearDeliveredList) {//判断是否清楚 
            synchronized (deliveredMessages) {//采用双重检查锁 
                if (clearDeliveredList) { 
                    if (!deliveredMessages.isEmpty()) { 
                        if (session.isTransacted()) {//是事务消息 
                            if (previouslyDeliveredMessages == null) { 
                                previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId()); 
                            } 
                            for (MessageDispatch delivered : deliveredMessages) { 
                                previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); 
                            } 
                            LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt", 
                                      getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size()); 
                        } else { 
                            if (session.isClientAcknowledge()) { 
                                LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); 
                                // allow redelivery 
                                if (!this.info.isBrowser()) { 
                                    for (MessageDispatch md: deliveredMessages) { 
                                        this.session.connection.rollbackDuplicate(this, md.getMessage()); 
                                    } 
                                } 
                            } 
                            LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); 
                            deliveredMessages.clear(); 
                            pendingAck = null; 
                        } 
                    } 
                    clearDeliveredList = false; 
                } 
            } 
        } 
    }

  dequeue(-1) :从unconsumedMessage中取出一个消息,在创建一个消费者时,就会为这个消费者创建一个未消费的消息通道,这个通道分为两种,一种是简单优先级队列分发通道SimplePriorityMessageDispatchChannel ;另一种是先进先出的分发通道FifoMessageDispatchChannel.至于为什么要存在这样一个消息分发通道,大家可以想象一下,如果消费者每次去消费完一个消息以后再去broker拿一个消息,效率是比较低的。所以通过这样的设计可以允许session能够一次性将多条消息分发给一个消费者。默认情况下对于queue来说,prefetchSize的值是1000

private MessageDispatch dequeue(long timeout) throws JMSException { 
        try { 
            long deadline = 0; 
            if (timeout > 0) { 
                deadline = System.currentTimeMillis() + timeout; 
            } 
            while (true) {//protected final MessageDispatchChannel unconsumedMessages; 
                MessageDispatch md = unconsumedMessages.dequeue(timeout); 
 
            ........... 
    } 

  beforeMessageIsConsumed(md):这里面主要是做消息消费之前的一些准备工作,如果ACK类型不是DUPS_OK_ACKNOWLEDGE或者队列模式(简单来说就是除了Topic和DupAck这两种情况),所有的消息先放到deliveredMessages链表的开头。并且如果当前是事务类型的会话,则判断transactedIndividualAck,如果为true,表示单条消息直接返回ack。

  否则,调用ackLater,批量应答, client端在消费消息后暂且不发送ACK,而是把它缓存下来(pendingACK),等到这些消息的条数达到一定阀值时,只需要通过一个ACK指令把它们全部确认;这比对每条消息都逐个确认,在性能上要提高很多。

private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { 
        md.setDeliverySequenceId(session.getNextDeliveryId()); 
        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); 
        if (!isAutoAcknowledgeBatch()) { 
            synchronized(deliveredMessages) { 
                deliveredMessages.addFirst(md); 
            } 
            if (session.getTransacted()) { 
                if (transactedIndividualAck) { 
                    immediateIndividualTransactedAck(md); 
                } else { 
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 
                } 
            } 
        } 
    } 

  afterMessageIsConsumed:这个方法的主要作用是执行应答操作,这里面做以下几个操作
    Ø 如果消息过期,则返回消息过期的ack
    Ø 如果是事务类型的会话,则不做任何处理
    Ø 如果是AUTOACK或者(DUPS_OK_ACK且是队列),并且是优化ack操作,则走批量确认ack
    Ø 如果是DUPS_OK_ACK,则走ackLater逻辑
    Ø 如果是CLIENT_ACK,则执行ackLater

private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { 
        if (unconsumedMessages.isClosed()) { 
            return; 
        } 
        if (messageExpired) { 
            acknowledge(md, MessageAck.EXPIRED_ACK_TYPE); 
            stats.getExpiredMessageCount().increment(); 
        } else { 
            stats.onMessage(); 
            if (session.getTransacted()) { 
                // Do nothing. 
            } else if (isAutoAcknowledgeEach()) { 
                if (deliveryingAcknowledgements.compareAndSet(false, true)) { 
                    synchronized (deliveredMessages) { 
                        if (!deliveredMessages.isEmpty()) { 
                            if (optimizeAcknowledge) { 
                                ackCounter++; 
 
                                // AMQ-3956 evaluate both expired and normal msgs as 
                                // otherwise consumer may get stalled 
                                if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { 
                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 
                                    if (ack != null) { 
                                        deliveredMessages.clear(); 
                                        ackCounter = 0; 
                                        session.sendAck(ack); 
                                        optimizeAckTimestamp = System.currentTimeMillis(); 
                                    } 
                                    // AMQ-3956 - as further optimization send 
                                    // ack for expired msgs when there are any. 
                                    // This resets the deliveredCounter to 0 so that 
                                    // we won't sent standard acks with every msg just 
                                    // because the deliveredCounter just below 
                                    // 0.5 * prefetch as used in ackLater() 
                                    if (pendingAck != null && deliveredCounter > 0) { 
                                        session.sendAck(pendingAck); 
                                        pendingAck = null; 
                                        deliveredCounter = 0; 
                                    } 
                                } 
                            } else { 
                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 
                                if (ack!=null) { 
                                    deliveredMessages.clear(); 
                                    session.sendAck(ack); 
                                } 
                            } 
                        } 
                    } 
                    deliveryingAcknowledgements.set(false); 
                } 
            } else if (isAutoAcknowledgeBatch()) { 
                ackLater(md, MessageAck.STANDARD_ACK_TYPE); 
            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { 
                boolean messageUnackedByConsumer = false; 
                synchronized (deliveredMessages) { 
                    messageUnackedByConsumer = deliveredMessages.contains(md); 
                } 
                if (messageUnackedByConsumer) { 
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 
                } 
            } 
            else { 
                throw new IllegalStateException("Invalid session state."); 
            } 
        } 
    } 

  其实在以上消息的接收过程中,我们仅仅能看到这个消息从一个本地变量中出队,并没有对远程消息中心发送通讯获取,那么这个消息时什么时候过来的呢?也就是消息出队中  unconsumedMessages 这个东东时什么时候初始化的呢 ?那么接下去我们应该去通过创建连接的时候去看看了,具体连接的时候都做了什么呢:connectionFactory.createConnection()

protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { 
        if (brokerURL == null) { 
            throw new ConfigurationException("brokerURL not set."); 
        } 
        ActiveMQConnection connection = null; 
        try {// 果然发现了这个东东的初始化 
            Transport transport = createTransport(); 
            // 创建连接 
            connection = createActiveMQConnection(transport, factoryStats); 
            // 设置用户密码 
            connection.setUserName(userName); 
            connection.setPassword(password); 
            // 对连接做包装 
            configureConnection(connection); 
            // 启动一个后台传输线程 
            transport.start(); 
            // 设置客户端消费的id 
            if (clientID != null) { 
                connection.setDefaultClientID(clientID); 
            } 
  
            return connection; 
        } ...... 
   }

  创建连接的过程就是创建除了一个带有链路包装的TcpTransport 并且创建连接,最后启动一个传输线程,而这里的 transport.start()  调用的应该是TcpTransport 里面的方法,然而这个类中并没有 start,而是在父类
ServiceSupport.start()中:

public void start() throws Exception { 
        if (started.compareAndSet(false, true)) { 
            boolean success = false; 
            stopped.set(false); 
            try { 
                preStart();//一些初始化 
                doStart(); 
                success = true; 
            } finally { 
                started.set(success); 
            } 
            for(ServiceListener l:this.serviceListeners) { 
                l.started(this); 
            } 
        } 
    } 

  doStart 方法前做了一系列的初始化,然后调用 TcpTransport的doStart() 方法:

protected void doStart() throws Exception { 
        connect(); 
        stoppedLatch.set(new CountDownLatch(1)); 
        super.doStart(); 
    } 

  继而构建一个连接 设置一个 CountDownLatch 门闩 ,调用父类 TransportThreadSupport 的方法,新建了一个精灵线程并且启动:

protected void doStart() throws Exception { 
        runner = new Thread(null, this, "ActiveMQ Transport: " + toString(), stackSize); 
        runner.setDaemon(daemon); 
        runner.start(); 
    } 

  调用TransportThreadSupport.doStart(). 创建了一个线程,传入的是 this,调用子类的 run 方法,也就是 TcpTransport.run().

public void run() { 
        LOG.trace("TCP consumer thread for " + this + " starting"); 
        this.runnerThread=Thread.currentThread(); 
        try { 
            while (!isStopped()) { 
                doRun(); 
            } 
        } catch (IOException e) { 
            stoppedLatch.get().countDown(); 
            onException(e); 
        } catch (Throwable e){ 
            stoppedLatch.get().countDown(); 
            IOException ioe=new IOException("Unexpected error occurred: " + e); 
            ioe.initCause(e); 
            onException(ioe); 
        }finally { 
            stoppedLatch.get().countDown(); 
        } 
    } 

  run 方法主要是从 socket 中读取数据包,只要 TcpTransport 没有停止,它就会不断去调用 doRun:这里面,通过 wireFormat 对数据进行格式化,可以认为这是一个反序列化过程。wireFormat 默认实现是 OpenWireFormat,activeMQ 自定义的跨语言的wire 协议

protected void doRun() throws IOException { 
        try {//通过 readCommand 去读取数据 
            Object command = readCommand(); 
            //消费消息 
            doConsume(command); 
        } catch (SocketTimeoutException e) { 
        } catch (InterruptedIOException e) { 
        } 
    } 
protected Object readCommand() throws IOException { 
        return wireFormat.unmarshal(dataIn); 
} 

  这里的读取流的部分就是从Socket里面读取,而这个连接的 输入/输出流的初始化在  TcpTransport

protected void initializeStreams() throws Exception { 
        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) { 
            @Override 
            public int read() throws IOException { 
                receiveCounter++; 
                return super.read(); 
            } 
            @Override 
            public int read(byte[] b, int off, int len) throws IOException { 
                receiveCounter++; 
                return super.read(b, off, len); 
            } 
            @Override 
            public long skip(long n) throws IOException { 
                receiveCounter++; 
                return super.skip(n); 
            } 
            @Override 
            protected void fill() throws IOException { 
                receiveCounter++; 
                super.fill(); 
            } 
        }; 
        //Unread the initBuffer that was used for protocol detection if it exists 
        //so the stream can start over 
        if (initBuffer != null) { 
            buffIn.unread(initBuffer.buffer.array()); 
        } 
        this.dataIn = new DataInputStream(buffIn); 
        TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); 
        this.dataOut = new DataOutputStream(outputStream); 
        this.buffOut = outputStream; 
 
    }

  doConsume:流程走到了消费消息: 

public void doConsume(Object command) { 
        if (command != null) {//表示已经拿到了消息 
            if (transportListener != null) { 
                transportListener.onCommand(command); 
            } else { 
                LOG.error("No transportListener available to process inbound command: " + command); 
            } 
        } 
    } 

  TransportSupport 类中唯一的成员变量是 TransportListener transportListener;,这也意味着一个 Transport 支持类绑定一个传送监听器类,传送监听器接口 TransportListener 最重要的方法就是 void onCommand(Object command);,它用来处理命令。那么这个 transportListener 是在那里初始化的呢?可以思考一下 既然是TransportSupport 唯一的成员变量,而我们锁创建的TcpTransport 是他的子类,那么是不是在创建该transport的时候亦或是在对他进行包装处理的时候做了初始化呢? 我们会在流程中看到在新建 ActiveMQConnectionFactory 的时候有一行关键的代码:

connection = createActiveMQConnection(transport, factoryStats); 

  在这个方法里面追溯下去:会进入 ActiveMQConnection 的构造方法

protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception { 
 
        this.transport = transport; 
        this.clientIdGenerator = clientIdGenerator; 
        this.factoryStats = factoryStats; 
 
        // Configure a single threaded executor who's core thread can timeout if 
        // idle 
        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 
            @Override 
            public Thread newThread(Runnable r) { 
                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); 
                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796 
                //thread.setDaemon(true); 
                return thread; 
            } 
        }); 
        // asyncConnectionThread.allowCoreThreadTimeOut(true); 
        String uniqueId = connectionIdGenerator.generateId(); 
        this.info = new ConnectionInfo(new ConnectionId(uniqueId)); 
        this.info.setManageable(true); 
        this.info.setFaultTolerant(transport.isFaultTolerant()); 
        this.connectionSessionId = new SessionId(info.getConnectionId(), -1); 
 
        this.transport.setTransportListener(this); 
 
        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); 
        this.factoryStats.addConnection(this); 
        this.timeCreated = System.currentTimeMillis(); 
        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); 
    } 

  从以上代码我们发现  this.transport.setTransportListener(this); 那么这个this是什么呢  ? 正是ActiveMQConnection ,看了一眼该类,发现这个类实现了 TransportListener ,本身就是一个TransportListener。所以上面 transportListener.onCommand(command); 就是 ActiveMQConnection.onCommand(command)。除了和 Transport相互绑定,还对线程池执行器 executor 进行了初始化。这哥执行器是后来要进行消息处理的。

  这里面会针对不同的消息做分发,在ActiveMQMessageConsumer#receive方法中锁dequeue所返回的对象是MessageDispatch 。假设这里传入的 command 是MessageDispatch,那么这个 command 的 visit 方法就会调用processMessageDispatch 方法。剪切出其中的代码片段:

public Response processMessageDispatch(MessageDispatch md) throws Exception { 
       // 等待 Transport 中断处理完成 
       waitForTransportInterruptionProcessingToComplete(); 
       // 这里通过消费者 ID 来获取消费者对象 
//(ActiveMQMessageConsumer 实现了 ActiveMQDispatcher 接口),所以 
//MessageDispatch 包含了消息应该被分配到那个消费者的映射信息 
       ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); 
       if (dispatcher != null) { 
       // Copy in case a embedded broker is dispatching via 
       // vm:// 
       // md.getMessage() == null to signal end of queue 
       // browse. 
       Message msg = md.getMessage(); 
       if (msg != null) { 
       msg = msg.copy(); 
       msg.setReadOnlyBody(true); 
       msg.setReadOnlyProperties(true); 
       msg.setRedeliveryCounter(md.getRedeliveryCounter()); 
       msg.setConnection(ActiveMQConnection.this); 
       msg.setMemoryUsage(null); 
       md.setMessage(msg); 
       } 
       // 调用会话ActiveMQSession 自己的 dispatch 方法来处理这条消息 
       dispatcher.dispatch(md); 
       } else { 
           LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers); 
       } 
       return null; 
}

  其中 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());这行代码的 dispatchers 是在 通过session.createConsumer(destination); 的时候通过 ActiveMQMessageConsumer 的构造方法中有一行代码 :this.session.addConsumer(this); 将 this传入,即 ActiveMQMessageConsumer 对象。而这个 addConsumer 方法:

protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 
        this.consumers.add(consumer); 
        if (consumer.isDurableSubscriber()) { 
            stats.onCreateDurableSubscriber(); 
        } 
        this.connection.addDispatcher(consumer.getConsumerId(), this); 
    } 

  可以发现这里的初始化了:this.connection.addDispatcher(consumer.getConsumerId(), this); 这里的this 即 ActiveMQSession。所以回到 ActiveMQConnection#onCommand方法内 processMessageDispatch 这个方法最后调用了 dispatcher.dispatch(md); 这个方法的核心功能就是处理消息的分发。:

public void dispatch(MessageDispatch messageDispatch) { 
        try { 
            executor.execute(messageDispatch); 
        } catch (InterruptedException e) { 
            Thread.currentThread().interrupt(); 
            connection.onClientInternalException(e); 
        } 
    } 

  这里离我们真正要找的进行消息入队的结果很近了,进入executor.execute(messageDispatch);这个方法:

void execute(MessageDispatch message) throws InterruptedException { 
 
       ........... 
//如果会话不是异步分发并且没有使用 sessionpool 分发,则调用 dispatch 发送消息 
        if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) { 
            dispatch(message); 
        } else {//将消息直接放到队列里 
            messageQueue.enqueue(message); 
            wakeup(); 
        } 
    } 

  这里最后终于发现了入队,判断是否异步分发,不是的话走dispatch(message) 否则进入异步分发。默认是采用异步消息分发。所以,直接调用 messageQueue.enqueue,把消息放到队列中,并且调用 wakeup 方法:

public void wakeup() { 
        if (!dispatchedBySessionPool) {//进一步验证 
           // //判断 session 是否为异步分发 
            if (session.isSessionAsyncDispatch()) { 
                try { 
                    TaskRunner taskRunner = this.taskRunner; 
                    if (taskRunner == null) { 
                        synchronized (this) { 
                            if (this.taskRunner == null) { 
                                if (!isRunning()) { 
                                    // stop has been called 
                                    return; 
                                } 
//通过 TaskRunnerFactory 创建了一个任务运行类 taskRunner,这里把自己作为一个 task 传入到 createTaskRunner 中, 
//说明当前的类一定是实现了 Task 接口的. 简单来说,就是通过线程池去执行一个任务,完成异步调度 
//这里由于executor != null 所以这个task的类型是PooledTaskRunner 
                                this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, 
                                        "ActiveMQ Session: " + session.getSessionId()); 
                            } 
                            taskRunner = this.taskRunner; 
                        } 
                    } 
                    taskRunner.wakeup(); 
                } catch (InterruptedException e) { 
                    Thread.currentThread().interrupt(); 
                } 
            } else {// 异步分发 
                while (iterate()) { 
                } 
            } 
        } 
    }

  所以,对于异步分发的方式,会调用 ActiveMQSessionExecutor 中的 iterate方法,我们来看看这个方法的代码  iterate ():这个方法里面做两个事

  Ø 把消费者监听的所有消息转存到待消费队列中
  Ø 如果 messageQueue 还存在遗留消息,同样把消息分发(调度)出去

public boolean iterate() { 
        // Deliver any messages queued on the consumer to their listeners.
// 将消费者上排队的任何消息传递给它们的侦听器。 for (ActiveMQMessageConsumer consumer : this.session.consumers) { if (consumer.iterate()) { return true; } } // No messages left queued on the listeners.. so now dispatch messages // queued on the session
// 侦听器上没有留下排队等待的消息。现在分派消息 MessageDispatch message = messageQueue.dequeueNoWait(); if (message == null) { return false; } else {// 分发(调度)消息 dispatch(message); return !messageQueue.isEmpty(); } }

  dispatch(message);消息确认分发。通过ActiveMQSessionExecutor的dispatch 方法,转到了 ActiveMQMessageConsumer 消费者类的  dispatch 方法:

public void dispatch(MessageDispatch md) { 
        MessageListener listener = this.messageListener.get(); 
        try { 
            clearMessagesInProgress(); 
            clearDeliveredList(); 
            synchronized (unconsumedMessages.getMutex()) { 
                if (!unconsumedMessages.isClosed()) {// 判断消息是否为重发消息 
                    if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { 
                        if (listener != null && unconsumedMessages.isRunning()) { 
                          //我这边通过consumer.receive()处理消息,所以这里listener为空,走下面 
                        } else { 
                            if (!unconsumedMessages.isRunning()) { 
                                // delayed redelivery, ensure it can be re delivered 
                                session.connection.rollbackDuplicate(this, md.getMessage()); 
                            } 
 
                            if (md.getMessage() == null) { 
                                // End of browse or pull request timeout. 
                                unconsumedMessages.enqueue(md); 
                            } else { 
                                if (!consumeExpiredMessage(md)) { 
                                    unconsumedMessages.enqueue(md); 
                                    if (availableListener != null) { 
                                        availableListener.onMessageAvailable(this); 
                                    } 
      ......... 
} 

  最终会走入 unconsumedMessages.enqueue(md);添加消息。这里需要注意的是enqueue 方法:由于消费者可能处于阻塞状态,这里做了入队后回释放锁,也就是接触阻塞。

public void enqueue(MessageDispatch message) { 
        synchronized (mutex) { 
            list.addLast(message); 
            mutex.notify(); 
        } 
    }

  到这里为止,消息如何接受以及他的处理方式的流程,我们已经搞清楚了。其实在这个消息消费的流程中,已经在建立连接,创建消费者的时候就已经初始化好了消息队列了。结合上面的过程来看看整个消费流程的流程图

 消费端的 PrefetchSize:

  在消息发布的时候我们曾经研究过 producerWindowSize 。主要用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的大小,且只对异步发送有意义。对于客户端,也是类似存在这么一个属性来约束客户端的消息处理。activemq 的 consumer 端也有窗口机制,通过 prefetchSize 就可以设置窗口大小。不同的类型的队列,prefetchSize 的默认值也是不一样的.

  Ø 持久化队列和非持久化队列的默认值为 1000

  Ø 持久化 topic 默认值为 100

  Ø 非持久化队列的默认值为 Short.MAX_VALUE-1

 测试方法是在MQ上生产1000条消息,先后启动comsumer1,comsumer2 两个消费者并且循环调用1000次消费,我们会发现 comsumer2 拿不到消息,这个时候我们可以通过debug进入comsumer1 的ActiveMQConnect会发现里面有个属性的size=1000.其实就是这个prefetchSize,翻译过来是预取大小,消费端会根据prefetchSize 的大小批量获取数据。意思是在创建连接的时候会取获取1000条消息预加载到缓存中等待处理,这样子导致comsumer2去获取消息的时候 broker上已经空了。

prefetchSize 的设置方法:

  在 createQueue 中添加 consumer.prefetchSize,就可以看到效果

Destination destination=session.createQueue("myQueue?consumer.prefetchSize=10"); 

  既然有批量加载,那么一定有批量确认,这样才算是彻底的优化,这就涉及到 optimizeAcknowledge

  ActiveMQ 提供了 optimizeAcknowledge 来优化确认,它表示是否开启“优化ACK”,只有在为 true 的情况下,prefetchSize 以及optimizeAcknowledgeTimeout 参数才会有意义优化确认一方面可以减轻 client 负担(不需要频繁的确认消息)、减少通信开销,另一方面由于延迟了确认(默认 ack 了 0.65*prefetchSize 个消息才确认),这个在源码中有体现。在ActiveMQMessageConsumer#receive方法内的处理消息后的 afterMessageIsConsumed 方法内有一个判断:

if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || 
     (optimizeAcknowledgeTimeOut > 0 &&  
         System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { 
     MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 
       if (ack != null) { 
             deliveredMessages.clear(); 
             ackCounter = 0; 
             session.sendAck(ack);//满足条件则发送批量应答ACK 
             optimizeAckTimestamp = System.currentTimeMillis(); 
       } 
       // AMQ-3956 - as further optimization send 
       // ack for expired msgs when there are any. 
       // This resets the deliveredCounter to 0 so that 
       // we won't sent standard acks with every msg just 
       // because the deliveredCounter just below 
       // 0.5 * prefetch as used in ackLater() 
       if (pendingAck != null && deliveredCounter > 0) { 
            session.sendAck(pendingAck); 
            pendingAck = null; 
            deliveredCounter = 0; 
       } 
}

  broker 再次发送消息时又可以批量发送如果只是开启了 prefetchSize,每条消息都去确认的话,broker 在收到确认后也只是发送一条消息,并不是批量发布,当然也可以通过设置 DUPS_OK_ACK来手动延迟确认, 我们需要在 brokerUrl 指定 optimizeACK 选项

ConnectionFactory connectionFactory= new ActiveMQConnectionFactory("tcp://192.168.11.153:61616?jms.optimizeAcknowledge=true&jms.optimizeAcknowledgeTimeOut=10000");

  Ø 注意,如果 optimizeAcknowledge 为 true,那么 prefetchSize 必须大于 0. 当 prefetchSize=0 的时候,表示 consumer 通过 PULL 方式从 broker 获取消息.

  optimizeAcknowledge 和 prefetchSize 的作用,两者协同工作,通过批量获取消息、并延迟批量确认,来达到一个高效的消息消费模型。它比仅减少了客户端在获取消息时的阻塞次数,还能减少每次获取消息时的网络通信开销

  Ø 需要注意的是,如果消费端的消费速度比较高,通过这两者组合是能大大提升 consumer 的性能。如果 consumer 的消费性能本身就比较慢,设置比较大的 prefetchSize 反而不能有效的达到提升消费性能的目的。因为过大的prefetchSize 不利于 consumer 端消息的负载均衡。因为通常情况下,我们都会部署多个 consumer 节点来提升消费端的消费性能。这个优化方案还会存在另外一个潜在风险,当消息被消费之后还没有来得及确认时,client 端发生故障,那么这些消息就有可能会被重新发送给其他consumer,那么这种风险就需要 client 端能够容忍“重复”消息。

 消息的确认过程:

  消息确认有四种 ACK_MODE,分别是:

    1. AUTO_ACKNOWLEDGE = 1 自动确认

    2.CLIENT_ACKNOWLEDGE = 2 客户端手动确认

    3.DUPS_OK_ACKNOWLEDGE = 3 自动批量确认

    4.SESSION_TRANSACTED = 0 事务提交并确认

   ACK_MODE 的选择影响着消息消费流程的走向。虽然 Client 端指定了 ACK 模式,但是在 Client 与 broker 在交换 ACK 指令的时候,还需要告知 ACK_TYPE,ACK_TYPE 表示此确认指令的类型,不同的ACK_TYPE 将传递着消息的状态,broker 可以根据不同的 ACK_TYPE 对消息进行不同的操作。

ACK_TYPE应答类型:

  DELIVERED_ACK_TYPE = 0  消息"已接收",但尚未处理结束

  STANDARD_ACK_TYPE = 2  "标准"类型,通常表示为消息"处理成功",broker 端可以删除消息了

  POSION_ACK_TYPE = 1  消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者 DLQ(死信队列),在消息处理的时候,dispatch方法内会判断该消息是否为重发消息

if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { 
                        if (listener != null && unconsumedMessages.isRunning()) { 
                        // 这段为非重发消息,走else 
                    } else { 
                        // deal with duplicate delivery 
                        ConsumerId consumerWithPendingTransaction; 
                        if (redeliveryExpectedInCurrentTransaction(md, true)) { 
                            LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage()); 
                            if (transactedIndividualAck) { 
                                immediateIndividualTransactedAck(md); 
                            } else { 
                                session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1)); 
                            } 
                        } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) { 
                            LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction); 
                            session.getConnection().rollbackDuplicate(this, md.getMessage()); 
                            dispatch(md); 
                        } else {// 走POSION_ACK_TYPE 添加Active_DLQ 死信队列 
                            LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md); 
                            posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId()); 
                        } 
                    }

  REDELIVERED_ACK_TYPE = 3  消息需"重发",比如 consumer 处理消息时抛出了异常,broker 稍后会重新发送此消息

  INDIVIDUAL_ACK_TYPE = 4  表示只确认"单条消息",无论在任何 ACK_MODE 下

  UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一条消息在转发给“订阅者”时,发现此消息不符合 Selector 过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(相当于在 Broker 上确            认了消息)。

  Client 端在不同的 ACK 模式时,将意味着在不同的时机发送 ACK 指令,每个 ACK Command 中会包含 ACK_TYPE,那么 broker 端就可以根据 ACK_TYPE 来决定此消息的后续操作。在 afterMessageIsConsumed 消息接收处理后会根据条件来设置 ACK_TYPE.

消息的重发机制原理:

  在正常情况下,有几中情况会导致消息重新发送

  Ø 在事务性会话中,没有调用 session.commit 确认消息宕机或者调用session.rollback 方法回滚消息

  Ø 在非事务性会话中,ACK 模式为 CLIENT_ACKNOWLEDGE (客户端手动应答)的情况下,没有调用 session.commit或者调用了 recover 方法;

  一个消息被 redelivedred 超过默认的最大重发次数(默认 6 次)时,消费端会给 broker 发送一个”poison ack”表示这个消息有毒,告诉 broker 不要再发了。这个时候 broker 会把这个消息放到 DLQ(死信队列)。设置方法如下:

ActiveMQConnectionFactory connectionFactory1 = (ActiveMQConnectionFactory) connectionFactory; 
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
redeliveryPolicy.setMaximumRedeliveries(2); 
connectionFactory1.setRedeliveryPolicy(redeliveryPolicy);

死信队列:

  ActiveMQ 中默认的死信队列是 ActiveMQ.DLQ,如果没有特别的配置,有毒的消息都会被发送到这个队列。默认情况下,如果持久消息过期以后,也会被送到 DLQ 中。

   只要在处理消息的时候抛出一个异常就可以演示,会看到控制台对于失败消息会重发6次,登陆ActiveMQ控制台会看到一个 ActiveMQ.DLQ。在创建队列的时候可以直接指定从ActiveMQ.DLQ去消费消息。

  死信队列配置策略:

  缺省所有队列的死信消息都被发送到同一个缺省死信队列,不便于管理,可以通过 individualDeadLetterStrategy 或 sharedDeadLetterStrategy 策略来进行修改。在activemq.xml上

<destinationPolicy> 
      <policyMap> 
         <policyEntries> 
            <policyEntry topic=">" > 
                    <!-- The constantPendingMessageLimitStrategy is used to prevent 
                         slow topic consumers to block producers and affect other consumers 
                         by limiting the number of messages that are retained 
                         For more information, see: 
 
                         http://activemq.apache.org/slow-consumer-handling.html 
                    --> 
               <pendingMessageLimitStrategy> 
                 <constantPendingMessageLimitStrategy limit="1000"/> 
               </pendingMessageLimitStrategy> 
            </policyEntry> 
 
      // “>”表示对所有队列生效,如果需要设置指定队列,则直接写队列名称 
          <policyEntry queue=">"> 
             <deadLetterStrategy> 
           //queuePrefix:设置死信队列前缀 
           //useQueueForQueueMessage 设置队列保存到死信。 
              <individualDeadLetterStrategy queuePrefix="DLQ."useQueueForQueueMessages="true"/> 
             </deadLetterStrategy> 
          </policyEntry> 
         </policyEntries> 
       </policyMap> 
</destinationPolicy> 

  自动丢弃过期消息

<deadLetterStrategy> 
    <sharedDeadLetterStrategy processExpired="false" /> 
</deadLetterStrategy> 

ActiveMQ VirtualTopic

  ActiveMQ支持的虚拟Destinations分为有两种,分别是

  • 虚拟主题(Virtual Topics)
  • 组合 Destinations(CompositeDestinations)

  这两种虚拟Destinations可以看做对简单的topic和queue用法的补充,基于它们可以实现一些简单有用的EIP功能,虚拟主题类似于1对多的分支功能+消费端的cluster+failover,组合Destinations类似于简单的destinations直接的路由功能。

组合队列(Composite Destinations):

  当你想把同一个消息一次发送到多个消息队列,那么可以在客户端使用组合队列。

// send to 3 queues as one logical operation 
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C"); 
producer.send(queue, someMessage);

  当然,也可以混合使用队列和主题,只需要使用前缀:queue:// 或 topic://

// send to queues and topic one logical operation 
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A"); 
producer.send(queue, someMessage);

虚拟主题(Virtual Topics):

  ActiveMQ中,topic只有在持久订阅(durablesubscription)下是持久化的。存在持久订阅时,每个持久订阅者,都相当于一个持久化的queue的客户端,它会收取所有消息。这种情况下存在两个问题:

  1. 同一应用内consumer端负载均衡的问题:同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个都会获取所有消息。queue模式可以解决这个问题,broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能jms规范本身是没有的。
  2. 同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。

  为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能。使用起来非常简单。对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。

  默认虚拟主题的前缀是 :VirtualTopic.*  。自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.> 。自定义消费虚拟地址可以改,比如下面的配置就把它修改了。xml配置示例如下:

<broker xmlns="http://activemq.apache.org/schema/core"> 
    <destinationInterceptors> 
        <virtualDestinationInterceptor> 
            <virtualDestinations> 
                <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/><!-- 修改的Consumer的开头格式--> 
            </virtualDestinations> 
        </virtualDestinationInterceptor> 
    </destinationInterceptors> 
</broker>

  那么生产者发送的时候的代码如下:

Destination destination = session.createTopic("VirtualTopic.helloTopic");

  生产者是这样的:

Destination destination = session.createQueue("VirtualTopicConsumers.A.VirtualTopic.helloTopic"); 
Destination destination = session.createQueue("VirtualTopicConsumers.B.VirtualTopic.helloTopic");

ActiveMQ 静态网络配置:broker网络连接(broker的高性能方案):

  修改 activeMQ 服务器的 activeMQ.xml, 增加如下配置,这个配置只能实现单向连接,实现双向连接需要各个节点都配置如下配置。

<networkConnectors> 
    <networkConnector uri="static://(tcp://192.168.254.135:61616,tcp://192.168.254.136:61616)"/> 
</networkConnectors> 

  两个 Brokers 通过一个 static 的协议来进行网络连接。一个 Consumer 连接到BrokerB 的一个地址上,当 Producer 在 BrokerA 上以相同的地址发送消息,此时消息会被转移到 BrokerB 上,也就是说 BrokerA 会转发消息到BrokerB 上。

   在activeMQ中,进行了静态网络桥接的两台节点而言,当 Producer 在 BrokerA 上以相同的地址发送10条消息。一个 Consumer 连接到BrokerB去消费消息,当消费了一半的时候出现异常了,那么剩下来未处理的消息会被存放到 BrokerB 的待处理消息队列中,此时要通过BrokerA再去消费是消费不到的,万一此刻BrokerB 挂了,那么哪些没有消费的消息将会丢失。mq给我们提供了一个有效的消息回流机制。

<policyEntry queue=">" enableAudit="false"> 
    <networkBridgeFilterFactory> 
         <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> 
    </networkBridgeFilterFactory> 
</policyEntry>

ActiveMQ 的优缺点:

  ActiveMQ 采用消息推送方式,所以最适合的场景是默认消息都可在短时间内被消费。数据量越大,查找和消费消息就越慢,消息积压程度与消息速度成反比。

  缺点:

  • 吞吐量低。由于 ActiveMQ 需要建立索引,导致吞吐量下降。这是无法克服的缺点,只要使用完全符合 JMS 规范的消息中间件,就要接受这个级别的TPS。
  • 无分片功能。这是一个功能缺失,JMS 并没有规定消息中间件的集群、分片机制。而由于 ActiveMQ 是伟企业级开发设计的消息中间件,初衷并不是为了处理海量消息和高并发请求。如果一台服务器不能承受更多消息,则需要横向拆分。ActiveMQ 官方不提供分片机制,需要自己实现。

  适用场景:

  对 TPS 要求比较低的系统,可以使用 ActiveMQ 来实现,一方面比较简单,能够快速上手开发,另一方面可控性也比较好,还有比较好的监控机制和界面

  不适用的场景:

  消息量巨大的场景。ActiveMQ 不支持消息自动分片机制,如果消息量巨大,导致一台服务器不能处理全部消息,就需要自己开发消息分片功能。


发布评论
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

ActiveMQ消息的持久化策略知识解答
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。