在上一章的指南中,我们写了一个命名队列:生产者往该命名队列发送消息、消费从从该命名队列中消费消息。在本章中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务。工作队列(即任务队列)的主要思想是避免立即执行那些需要等他们执行完成的资源密集型任务。相反,我们将任务安排在稍后完成。我们将任务封装为消息并将其发送到队列,后台运行的工作进程将取出任务并执行完成。如果你启动了多个工作者,这些任务将在多个工作者之间分享。

  这个概念也即我们说的异步,在项目中,有时候一个简单的Web请求,后台要做一系统的操作,这时候,如果后台执行完成之后再给前台返回消息将会导致浏览器页面等待从而出现假死状态。因此,通常的做法是,在这个Http请求到后台,后台获取到正确的参数等信息后立即给前台返回一个成功标志,然后后台异步地进行后续的操作。

1、准备

  本章中,我们将发送字符串消息来模拟复杂的任务。这里因为没有一个真实的复杂任务,因此用Thread.sleep()方法来模拟复杂耗时的任务。我们用字符串中的含点(“.")的数量来表示任务的复杂程度,一个点表示一秒钟的耗时,例如:一个发送”Hello ...“字符串的任务将会耗时3秒钟。

  我们可以直接将上一章中的Send.java代码拿过来修改,允许从命令行发送消息。本程序将会把任务调试到工作队列,因此,我们将类名改为NewTask.java:

String message = String.join(" ", argv); 
 
channel.basicPublish("", "hello", null, message.getBytes()); 
System.out.println(" [x] Sent '" + message + "'");

   此时完整的NewTask.java代码为:

 1 public class NewTask { 
 2  
 3     private final static String QUEUE_NAME = "hello"; 
 4  
 5     public static void main(String[] argv) throws IOException, TimeoutException { 
 6  
 7         ConnectionFactory connectionFactory = new ConnectionFactory(); 
 8         connectionFactory.setHost("HOST"); 
 9  
10         try(Connection connection = connectionFactory.newConnection(); 
11             Channel channel = connection.createChannel()) { 
12  
13             channel.queueDeclare(QUEUE_NAME,false,false,false,null); 
14  
15             String message = String.join(" ", argv); 
16              
17             channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8")); 
18             System.out.println(" [x] Sent '" + message + "'"); 
19         } 
20     } 
21 }

  之前的Recv.java也要做一些修改:模拟字符串消息中的每个点耗时1秒钟,它将处理传送过来的消息并执行任务,因此,我们修改为Work.java:

 1 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 
 2   String message = new String(delivery.getBody(), "UTF-8"); 
 3  
 4   System.out.println(" [x] Received '" + message + "'"); 
 5   try { 
 6     doWork(message); 
 7   } finally { 
 8     System.out.println(" [x] Done"); 
 9   } 
10 }; 
11 boolean autoAck = true; // acknowledgment is covered below 
12 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

  我们模拟执行过程中耗时的伪任务:

1 private static void doWork(String task) throws InterruptedException { 
2     for (char ch: task.toCharArray()) { 
3         if (ch == '.') Thread.sleep(1000); 
4     } 
5 }

  此时完整的Work.java为:

 1 public class Worker { 
 2     private final static String TASK_QUEUE_NAME = "hello"; 
 3  
 4     public static void main(String[] args) throws Exception { 
 5  
 6         ConnectionFactory connectionFactory = new ConnectionFactory(); 
 7         connectionFactory.setHost("HOST"); 
 8  
 9         Connection connection = connectionFactory.newConnection(); 
10         Channel channel = connection.createChannel(); 
11         channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null); 
12  
13         DeliverCallback deliverCallback = (consumerTag, delivery) -> { 
14             String message = new String(delivery.getBody(), "UTF-8"); 
15  
16             System.out.println(" [x] Received '" + message + "'"); 
17             try { 
18                 doWork(message); 
19             } catch (InterruptedException e) { 
20                 e.printStackTrace(); 
21             } finally { 
22                 System.out.println(" [x] Done"); 
23             } 
24         }; 
25  
26         boolean autoAck = true; // acknowledgment is covered below 
27         channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); 
28     } 
29  
30     private static void doWork(String task) throws InterruptedException { 
31         for (char ch: task.toCharArray()) { 
32             if (ch == '.') Thread.sleep(1000); 
33         } 
34     } 
35 }

2、循环调度

  使用工作队列的优点之一是能够轻松地进行并行化操作。假设我们在做一个后台日志收集系统,我们可以很容易地增加更多的Worker从而提高系统性能。

  首先,我们同时启动两个Worker,同样地,我这里也放到IDEA中启动:

  

  

  接下来,我们先后启动5个Task,并分别通过main()参数传入五个字符串消息:

1 First message. 
2 Second message.. 
3 Third message... 
4 Fourth message.... 
5 Fifth message.....

  

   执行五个发送任务之后,来看一下两个Worker都接收到了什么样的消息:

          

  默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均每个消费者将得到相同数量的消息。这种消息的调度方式称之为循环调度,你可以开启更多的Worker来进行测试。

3、消息回执

  因为消费者执行一个任务会有时间耗时,假设一个消费者在执行一个任务执行一半的时候挂掉了将会怎样?消息会不会因此丢失?在我们目前的代码里,一旦RabbitMq将一条消息转发给了一个消费者后,将会立即将消息删除(注意Worker.java里的autoAck),因此,在我们上面例子里,如kill掉一个正在处理数据的Worker,那么该数据将会丢失。不仅如此,所有那些指派给该Worker的还未处理的消息也会丢失。

  但在实际工作的,我们并不希望一个Worker挂掉之后就会丢失数据,我们希望的是:如果该Worker挂掉了,所有转发给该Worker的消息将会重新转发给其他Worker进行处理(包括处理了一半的消息)。为了确保一条消息永不丢失,RabbitMq支持消息回执。消费者在接收到一条消息,并且成功处理完成之后会给RabbitMq回发一条确认ack确认消息,RabbitMq此时才会删除该条消息。

  如果一个Worker正在处理一条消息时挂掉了(信道关闭、连接关闭、TCP连接丢失),它将没有机会发送ack回执,RabbitMq就认为该消息没有消费成功,于是便会将该消息重新放到队列中,如果此时有其他消费者还是在线状态,RabbitMq会立即将该条消息再转发给其他在线的消费者。这种机制可以保证任何消息都不会丢失。

  默认情况下,需要手动进行消息确认,在前面的例子里,我们通过autoAck=true显示地关闭了手动消息确认,因此,RabbitMq将采用自动消息确认的机制。现在,我们修改我们的程序,采用手动发送回执的方式,当我们完成对消息的处理后,再手动发送回执确认:

 1 channel.basicQos(1); // accept only one unack-ed message at a time (see below) 
 2  
 3 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 
 4   String message = new String(delivery.getBody(), "UTF-8"); 
 5  
 6   System.out.println(" [x] Received '" + message + "'"); 
 7   try { 
 8     doWork(message); 
 9   } finally { 
10     System.out.println(" [x] Done"); 
11     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
12   } 
13 }; 
14 boolean autoAck = false; 
15 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

  ack发送信道必须和接收消息的信道(channel)是同一个,如果尝试通过一个不同的信道发送ack回执,将会抛出channel等级协议异常(官网说会抛出异常,但是我在实际测试中并没有抛异常,只是该条消息得不到回执,从而也无法删除)。

  一个常见的错误是忘了手动回执,虽然只是一个简单的错误,但是带来的后果却是严重的,它将导致已经消费掉的消费不会被删除,并且当消费该消息的消费者在退出之后,RabbitMq会将该条消息重新进行转发,内存将被慢慢耗尽。我们可以通过正面的命令来检查这种错误:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

    

  该命令有三列内容,第一列是在监听的队列名称,第二列是Ready状态的消息数量,第三列是Unacked的消息数量。

4、消息的持久化

  在3中我们讲解了如何保证当消费者挂掉之后消息不被丢失,但是,如果RabbitMq服务或者部署RabbitMq的服务器挂掉了之后,消息仍然会丢失。当RabbitMq崩溃之后,它将会忘记所有的队列和消息,除非,有什么机制让RabbitMq将队列信息和消息保存下来。

  要确保消息和队列不会丢失,我们必须要确保两件事情。

  首先,我们要确保RabbitMq永远不丢失队列,要做到这点,我们在定义的时候就需要告诉RabbitMq它是需要持久化的,通过指定durable参数实现:

boolean durable = true; 
channel.queueDeclare("hello", durable, false, false, null);

  虽然这个命令本身是正确的,但是在我们目前它不能工作。因为我们前面已经定义了一个非持久化的hello队列,RabbitMq不允许重新定义一个已经存在的队列(用不同的参数),否则会抛出异常:

Exception in thread "main" java.io.IOException 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144) 
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:962) 
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333) 
    at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:23) 
    Suppressed: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, 
reply-text=PRECONDITION_FAILED - parameters for queue 'hello' in vhost '/' not equivalent, class-id=50, method-id=10) at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:396) at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:292) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:607) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:541) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68) at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:29)

  要么重启RabbitMq让该临时队列消失,要么在控制台将该队列删除,或者重新创建一个新的队列:

1 boolean durable = true; 
2 channel.queueDeclare("task_queue", durable, false, false, null);

  生产者和消费者要做同步修改。

  上面这一步,我们保证了队列(task_quee)的持久化,此时,即便RabbitMq崩溃了也不会丢失该队列,当RabbitMq重启后将自动重新加载该队列。

  其次,我们需要确保我们的消息也被持久化,要做到这一点,在生产者发布消息的时候需要指定消息的属性为:PERSISTENT_TEXT_PLAIN。

1 import com.rabbitmq.client.MessageProperties; 
2  
3 channel.basicPublish("", "task_queue", 
4             MessageProperties.PERSISTENT_TEXT_PLAIN, 
5             message.getBytes());

  注意,即便设置了消息的持久化属性也不能保证消息会被100%地写入到磁盘中,因为RabbitMq在接收到消息和写入到磁盘不是同步的,有可能消息只是被写入到缓存中而还没来和及写入磁盘的时候,RabbitMq崩溃了,此时也会丢失消息。但无论如何,比前面简单的消息队列已经强大了很多。

5、公平调度

  您可能已经注意到,任务调度仍然不能完全按照我们希望的方式工作。举个例子,在只有两个Worker的环境中,奇数的消息比较重,偶数的消息比较轻时,一个Worker将会一直处于忙碌状态,而另一个Worker将会一直处于空闲状态,但RabbitMq并不知道这种情况,它会依然均衡地向两个Worker传递消息。

  发生这种情况是因为,当一个消息进入队列之后,RabbitMq只是盲目地将该第n个消息转发给第n个消费者,它并不关注每个消费者发了多少个回执。

  为了解决这个问题,我们可以通过调用basicQos方法,给它传入1。这将告诉RabbitMq不要同时给一个队列转发多于1条的消息,换句话说,在一个消费者没有完成并回执前一条消息时,不要再给它转发其他消息。

1 int prefetchCount = 1; 
2 channel.basicQos(prefetchCount);

6、完整的代码

  一、NewTask.java

 1 import com.rabbitmq.client.Channel; 
 2 import com.rabbitmq.client.Connection; 
 3 import com.rabbitmq.client.ConnectionFactory; 
 4 import com.rabbitmq.client.MessageProperties; 
 5  
 6 public class NewTask { 
 7  
 8   private static final String TASK_QUEUE_NAME = "task_queue"; 
 9  
10   public static void main(String[] argv) throws Exception { 
11     ConnectionFactory factory = new ConnectionFactory(); 
12     factory.setHost("localhost"); 
13     try (Connection connection = factory.newConnection(); 
14          Channel channel = connection.createChannel()) { 
15         channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 
16  
17         String message = String.join(" ", argv); 
18  
19         channel.basicPublish("", TASK_QUEUE_NAME, 
20                 MessageProperties.PERSISTENT_TEXT_PLAIN, 
21                 message.getBytes("UTF-8")); 
22         System.out.println(" [x] Sent '" + message + "'"); 
23     } 
24   } 
25  
26 }

  二、Worker.java

  

 1 import com.rabbitmq.client.Channel; 
 2 import com.rabbitmq.client.Connection; 
 3 import com.rabbitmq.client.ConnectionFactory; 
 4 import com.rabbitmq.client.DeliverCallback; 
 5  
 6 public class Worker { 
 7  
 8   private static final String TASK_QUEUE_NAME = "task_queue"; 
 9  
10   public static void main(String[] argv) throws Exception { 
11     ConnectionFactory factory = new ConnectionFactory(); 
12     factory.setHost("localhost"); 
13     final Connection connection = factory.newConnection(); 
14     final Channel channel = connection.createChannel(); 
15  
16     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 
17     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 
18  
19     channel.basicQos(1); 
20  
21     DeliverCallback deliverCallback = (consumerTag, delivery) -> { 
22         String message = new String(delivery.getBody(), "UTF-8"); 
23  
24         System.out.println(" [x] Received '" + message + "'"); 
25         try { 
26             doWork(message); 
27         } finally { 
28             System.out.println(" [x] Done"); 
29             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
30         } 
31     }; 
32     channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); 
33   } 
34  
35   private static void doWork(String task) { 
36     for (char ch : task.toCharArray()) { 
37         if (ch == '.') { 
38             try { 
39                 Thread.sleep(1000); 
40             } catch (InterruptedException _ignored) { 
41                 Thread.currentThread().interrupt(); 
42             } 
43         } 
44     } 
45   } 
46 }

  至此,工作队列模式讲解完了,下一章,将讲解发布-订阅模式。


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!