前言
- 同步和异步通信的优缺点
- MQ简介
- RabbitMQ的五种消息模式及其Spring整合
MQ简介
同步通讯
微服务的Feign调用属于同步通讯。
优点:响应比较及时。
同步通讯存在一些问题:
- 耦合度高:每次新需求,都要修改原来的代码
- 性能下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和
- 资源浪费:调用链中每个服务在等待响应的过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
- 级联失败:如果服务提供者出现问题,所有的调用方都会跟着出问题
异步通讯
事件驱动,先有事件,再有执行。
异步通讯的优势:
- 服务解耦
- 性能提升,吞吐量提高
- 故障隔离
- 流量削峰:Broker中可以缓存相应的数据
异步通信的缺点:
- 依赖于Broker的可靠性、安全性、吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理
MQ对比
MessageQueue,消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
RabbitMQ入门
- 基于Erlang语言开发
- 开源: RabbitMQ官网
安装
1 2 3 4 5 6 7 8 9 10 11 12
| docker pull rabbitmq:3-management
docker run \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
|
RabbitMQ概念和结构
- Channel:操作MQ的工具
- exchange:路由消息到队列中
- Queue:缓存消息
- Virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
RabbitMQ实践
Java官方实现(BasicQueue Demo)
- 消费者和生产者都要连接MQ,并且创建好队列
- 消费者和生产者都要声明队列,防止消费到不存在的队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("admin"); factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null);
String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】");
channel.close(); connection.close();
channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。");
|
SpringAMQP
- AMQP:Advanced Message Queuing Protocol,用于在应用程序之间传递业务消息的开发标准,与平台和语言无关
- Spring AMQP:基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息
- spring-amqp:API抽象
- spring-rabbit:底层实现
- SpringAMQP官网
依赖和配置
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
1 2 3 4 5 6 7
| spring: rabbitmq: host: 127.0.0.1 port: 5672 virtual-host: / username: admin password: admin
|
简单代码(BasicQueue)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void testSimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); } }
|
1 2 3 4 5 6 7 8
| @Component public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) { System.out.println("消费者接收到simple.queue代码: 【" + msg + "】"); } }
|
RabbitMQ消费模式
基本消息队列(BasicQueue)
一对一处理,阅后即焚。
- publisher:消息发布,将消息发送到队列queue
- queue:消息队列,负责接收并缓存消息
- consumer:订阅队列,处理队列中的消息
工作消息队列(WorkQueue)
一对多处理,多个消费者消费一个队列,消费不会重复,消费者是合作关系。
目的是为了提高吞吐。
模拟实践
publisher每秒产生50条消息
consumer中定义两个监听者,监听这个队列
消费者1每秒处理50条,消费者2每秒消费5条
- 预期1秒消费完
1 2 3 4 5 6 7 8 9
| @Test public void testWorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, message__"; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
|
1 2 3 4 5 6 7 8 9 10 11
| @RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到simple.queue代码: 【" + msg + "】" + LocalTime.now()); Thread.sleep(20); }
@RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到simple.queue代码: 【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }
|
- 问题出现
- 消息预取:RabbitMQ会把消息先分配到了各个消费者,然后消费者慢慢消费
- 导致两个Consumer预取的消息一样多,但是Consumer2消费能力弱,导致了超时
- 问题解决:设置预取上限为1
1
| spring.rabbitmq.listener.simple.prefetch=1
|
发布订阅(Publish、Subscribe)
允许将一个消息发送给多个队列,实现方式时加入了exchange(交换机)。
exchange只负责转发,不负责存储
Fanout Exchange:广播
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue
案例
1 2 3 4 5 6
| @Test public void testSendFanoutQueue() throws InterruptedException { String exchangeName = "itcast.fanout"; String message = "hello, fanoutMsg"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Configuration public class FanoutConfig {
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("itcast.fanout"); }
@Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1"); } @Bean public Queue fanoutQueue2() { return new Queue("fanout.queue2"); } @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); }
@Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到fanout.queue1代码: 【" + msg + "】" + LocalTime.now()); }
@RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.err.println("消费者2接收到fanout.queue1代码: 【" + msg + "】" + LocalTime.now()); }
|
Direct Exchange:路由
会将接收到的消息,根据规则路由到指定的Queue,称为路由
- 每一个Queue都与Exchange设置一个或多个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
1 2 3 4 5 6 7 8
| @Test public void testSendDirectQueue() { String exchangeName = "memo.direct"; String message = "hello, yellow"; rabbitTemplate.convertAndSend(exchangeName, "yellow", message); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = "memo.direct"), value = @Queue("direct.queue1"), key = {"red", "blue"} )) public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到direct.queue1消息: 【" + msg + "】" + LocalTime.now()); }
@RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = "memo.direct"), value = @Queue("direct.queue2"), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg) { System.err.println("消费者2接收到direct.queue2消息: 【" + msg + "】" + LocalTime.now()); }
|
- @RabbitListener
- @Queue
- @Exchange(value=””, type=””)
- key={“”,””}
Topic Exchange:主题
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.
分割
1 2 3 4 5 6 7 8
| @Test public void testSendTopicQueue() { String exchangeName = "memo.topic"; String message = "hello, US weather"; rabbitTemplate.convertAndSend(exchangeName, "us.weather", message); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = "memo.topic", type = ExchangeTypes.TOPIC), value = @Queue("topic.queue1"), key = "#.weather" )) public void listenTopicQueue1(String msg) { System.err.println("topic.queue1消息: 【" + msg + "】" + LocalTime.now()); }
@RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = "memo.topic", type = ExchangeTypes.TOPIC), value = @Queue("topic.queue2"), key = "china.#" )) public void listenTopicQueue2(String msg) { System.err.println("topic.queue2消息: 【" + msg + "】" + LocalTime.now()); }
|
SpringAMQP消息转换器
在SpringAMQP的发送方法中,接受消息的类型是Object,SpringAMQP帮我们序列化成字节后发送。
- 默认序列化方式:
`java-serialized-object
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter
来处理的,默认的方式是基于JDK的ObjectOutputStream完成的序列化。
- 可以定义一个MessageConverter类型的Bean就可以替换
JSON方式序列化
1 2 3 4 5
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
|
1 2 3 4
| @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
|
- 接收:和发送的配置一样,然后用Map或者Object等接收就行