知识总览

image-20240127195524891

同步调用

发出信息后,需要等待回复再去做别的事情

同步调用的优势

时效性强,等待到结果后才返回

同步调用的问题

  • 拓展性差
  • 性能下降
  • 级联失败问题

异步调用

例如支付服务不在调用业务关联度低的服务,而是发送消息通知到Broker

image-20240127201310246

image-20240127202023427

异步调用的问题

image-20240127202103952

MQ的技术选型

image-20240127203145451

RabbitMQ的安装

  • 上传mq.tar

  • docker load -i mq.tar
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    - ```bash
    # 安装容器
    docker run \
    -e RABBITMQ_DEFAULT_USER=itheima \
    -e RABBITMQ_DEFAULT_PASS=123321 \
    -v mq-plugins:/plugins \
    --name mq \
    --hostname mq \
    -p 15672:15672 \
    -p 5672:5672 \
    -d \
    rabbitmq:3.8-management
  • 浏览器打开192.168.186.100:15672,即可访问RabbitMQ的客户端,用户名为itheima,密码为123321

image-20240128111710515

SpringAMQP

使用官方文档的接口太繁琐,SpringAMQP封装好了一层接口

image-20240128120124814

SprinngAMQP入门案例

image-20240128120527743

  • 引入spring-amqp依赖

    image-20240128120907812

  • 配置RabbitMQ服务端地址

    image-20240128121000125

  • 发送消息

    image-20240128121034359

接收消息

image-20240128122154551

工作模型

多个消费者共享消息队列中的消息

模拟工作Queue

image-20240128124550458

消费者消息推送限制

image-20240128130212062

交换机(Exchange)

Fanout交换机

image-20240128153243949

示例

image-20240128153741293

编写消费者方法:

1
2
3
4
5
6
7
8
9
10
11
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) throws InterruptedException {
System.out.println("消费者1收到了fanout.queue1消息.......:【"+msg+"】");
Thread.sleep(200);
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) throws InterruptedException {
System.out.println("消费者2收到了fanout.queue2消息.......:【"+msg+"】");
Thread.sleep(200);
}

编写发送者方法:

1
2
3
4
5
6
@Test
public void testSendFanout(){
String exchangeName = "hmall.fanout";
String msg = "hello,everyone";
rabbitTemplate.convertAndSend(exchangeName,null,msg);
}

Direct交换机

image-20240128155218316

Topic交换机

image-20240128161841180

image-20240128162151952

在java客户端中声明队列和交换机

image-20240128162534797

image-20240128191959299

应用

image-20240128210312866

image-20240128210029569

image-20240128210205833

image-20240128210243560

消息可靠性

主要体现在发送者可靠性、MQ可靠性以及消费者可靠性

发送者可靠性

image-20240128211719530

生产者(发送者)确认

image-20240128213142547

MQ可靠性

  • 数据持久性
  • lazyqueue
image-20240129191153803 image-20240129192244085

消费者的可靠性

消费者的可靠性包括消费者确认机制、失败重试机制、业务幂机制

消费者确认机制

image-20240129194711131

消费者失败重试机制

image-20240129204329486

image-20240129205455700

业务幂等性

image-20240129211443774

实现业务幂等

方案一:唯一消息id

image-20240129211709032

业务判断

image-20240129213715670

思维拓展

image-20240129214607888

延迟消息

生产者发送消息时制定一个时间,消费者不能立刻收到消息,而是在指定时间之后才收到消息

image-20240129220048624

延迟消息实现

包括两种:死信交换机以及延迟消息插件

死信交换机

image-20240129221249069

延迟消息插件

image-20240130111149493

image-20240130111523388

取消超时订单

image-20240130113320483