0%

RabbitMQ基础

前言

  1. 同步和异步通信的优缺点
  2. MQ简介
  3. RabbitMQ的五种消息模式及其Spring整合

MQ简介

同步通讯

微服务的Feign调用属于同步通讯。

优点:响应比较及时。

同步通讯存在一些问题:

  • 耦合度高:每次新需求,都要修改原来的代码
  • 性能下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和
  • 资源浪费:调用链中每个服务在等待响应的过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
  • 级联失败:如果服务提供者出现问题,所有的调用方都会跟着出问题

异步通讯

事件驱动,先有事件,再有执行。

异步通讯的优势:

  • 服务解耦
  • 性能提升,吞吐量提高
  • 故障隔离
  • 流量削峰:Broker中可以缓存相应的数据

异步通信的缺点:

  • 依赖于Broker的可靠性、安全性、吞吐能力
  • 架构复杂了,业务没有明显的流程线,不好追踪管理

MQ对比

MessageQueue,消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

RabbitMQ入门

  1. 基于Erlang语言开发
  2. 开源: RabbitMQ官网

安装

1
2
3
4
5
6
7
8
9
10
11
12
# 拉镜像
docker pull rabbitmq:3-management
# 运行容器
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq \
--hostname mq1 \
-p 15672:15672 \ # 15672 是管理地址
-p 5672:5672 \ # rabbitmq端口
-d \
rabbitmq:3-management

RabbitMQ概念和结构

  • Channel:操作MQ的工具
  • exchange:路由消息到队列中
  • Queue:缓存消息
  • Virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

RabbitMQ实践

Java官方实现(BasicQueue Demo)

  • 消费者和生产者都要连接MQ,并且创建好队列
    • 消费者和生产者都要声明队列,防止消费到不存在的队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);

// publisher
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");

// 5.关闭通道和连接
channel.close();
connection.close();

// consumer
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");

SpringAMQP

  • AMQP:Advanced Message Queuing Protocol,用于在应用程序之间传递业务消息的开发标准,与平台和语言无关
  • Spring AMQP:基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息
    • spring-amqp:API抽象
    • spring-rabbit:底层实现
  • SpringAMQP官网
  • 监听器容器
  • RabbitmqTemplate

依赖和配置

1
2
3
4
5
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
5
6
7
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin

简单代码(BasicQueue)

  • Publisher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
}
  • consumer
1
2
3
4
5
6
7
8
@Component
public class SpringRabbitListener {

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到simple.queue代码: 【" + msg + "】");
}
}

RabbitMQ消费模式

基本消息队列(BasicQueue)

一对一处理,阅后即焚。

  • publisher:消息发布,将消息发送到队列queue
  • queue:消息队列,负责接收并缓存消息
  • consumer:订阅队列,处理队列中的消息

  • 注意:消息被消费,队列中的消息就会被删除

工作消息队列(WorkQueue)

一对多处理,多个消费者消费一个队列,消费不会重复,消费者是合作关系。

目的是为了提高吞吐。

模拟实践

  1. publisher每秒产生50条消息

  2. consumer中定义两个监听者,监听这个队列

  3. 消费者1每秒处理50条,消费者2每秒消费5条

  4. 预期1秒消费完
  • Publisher
1
2
3
4
5
6
7
8
9
@Test
public void testWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, message__";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
  • Consumer
1
2
3
4
5
6
7
8
9
10
11
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到simple.queue代码: 【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到simple.queue代码: 【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
  • 问题出现
    • 消息预取:RabbitMQ会把消息先分配到了各个消费者,然后消费者慢慢消费
    • 导致两个Consumer预取的消息一样多,但是Consumer2消费能力弱,导致了超时
  • 问题解决:设置预取上限为1
1
spring.rabbitmq.listener.simple.prefetch=1

发布订阅(Publish、Subscribe)

允许将一个消息发送给多个队列,实现方式时加入了exchange(交换机)。

exchange只负责转发,不负责存储

Fanout Exchange:广播

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue

案例

  • 实现如下的结构

  • publisher
1
2
3
4
5
6
@Test
public void testSendFanoutQueue() throws InterruptedException {
String exchangeName = "itcast.fanout";
String message = "hello, fanoutMsg";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
  • consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 配置类
@Configuration
public class FanoutConfig {

@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("itcast.fanout");
}

@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}

@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}

// 消费
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到fanout.queue1代码: 【" + msg + "】" + LocalTime.now());
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.err.println("消费者2接收到fanout.queue1代码: 【" + msg + "】" + LocalTime.now());
}

Direct Exchange:路由

会将接收到的消息,根据规则路由到指定的Queue,称为路由

  • 每一个Queue都与Exchange设置一个或多个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

  • publisher
1
2
3
4
5
6
7
8
@Test
public void testSendDirectQueue() {
String exchangeName = "memo.direct";
String message = "hello, yellow";
// rabbitTemplate.convertAndSend(exchangeName, "blue", message);
rabbitTemplate.convertAndSend(exchangeName, "yellow", message);
// rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
  • Consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "memo.direct"), // 默认Direct
value = @Queue("direct.queue1"),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1消息: 【" + msg + "】" + LocalTime.now());
}

@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "memo.direct"), // 默认Direct
value = @Queue("direct.queue2"),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) {
System.err.println("消费者2接收到direct.queue2消息: 【" + msg + "】" + LocalTime.now());
}
  • @RabbitListener
    • @Queue
    • @Exchange(value=””, type=””)
    • key={“”,””}

Topic Exchange:主题

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割

  • publisher
1
2
3
4
5
6
7
8
@Test
public void testSendTopicQueue() {
String exchangeName = "memo.topic";
String message = "hello, US weather";
// rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
// rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
rabbitTemplate.convertAndSend(exchangeName, "us.weather", message);
}
  • comsumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "memo.topic", type = ExchangeTypes.TOPIC),
value = @Queue("topic.queue1"),
key = "#.weather"
))
public void listenTopicQueue1(String msg) {
System.err.println("topic.queue1消息: 【" + msg + "】" + LocalTime.now());
}

@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "memo.topic", type = ExchangeTypes.TOPIC),
value = @Queue("topic.queue2"),
key = "china.#"
))
public void listenTopicQueue2(String msg) {
System.err.println("topic.queue2消息: 【" + msg + "】" + LocalTime.now());
}

SpringAMQP消息转换器

在SpringAMQP的发送方法中,接受消息的类型是Object,SpringAMQP帮我们序列化成字节后发送。

  • 默认序列化方式:`java-serialized-object

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,默认的方式是基于JDK的ObjectOutputStream完成的序列化。

  • 可以定义一个MessageConverter类型的Bean就可以替换

JSON方式序列化

  • 发送
1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
1
2
3
4
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
  • 接收:和发送的配置一样,然后用Map或者Object等接收就行