目录

chen 的个人博客

VX:TiAmo151028
Phone:13403656751
Email:zxydczzs@gmail.com

X

Redis Redisson 发布/订阅模式 实现

一、Redis 的发布订阅模式

什么是发布订阅

任务队列:顾名思义,就是"传递消息的队列"。与任务队列进行交互的实体有两类,一类是生产者(producer),另一类则是消费者(consumer)。生产者将需要处理的任务放入任务队列中,而消费者则不断地从任务队列中读取任务信息并执行。

发布订阅模式

其实从 Pub/Sub 的机制来看,它更像是一个广播系统,多个订阅者(Subscribe)可以订阅多个频道(Channel),多个发布者(Publisher)可以往多个频道(Channel)中发布消息。

可以这么简单的理解

  1. Subscribe:收音机,可以收到多个频道,并以队列方式显示;
  2. Publisher:电台,可以往不同的 FM 频道中发消息;
  3. Channel:不同频率的 FM 频道;

特点

  1. 发送者(发布者)不是计划发送消息给特定的接受者(订阅者),而是发布的消息分到不同的频道,不需要知道什么样的订阅者订阅;
  2. 订阅者对一个或多个频道感兴趣,只需接受感兴趣的消息,不需要知道什么样的发布者发布的;

业务场景

  1. 说一个目前已经遇到的场景,客户端通过 websocket 和服务器进行长连接,服务端通过 websocket 通知到客户端的方式是由业务触发,通过 RPC 的方式通知到消息服务,在由消息服务通知到客户端,完成通信;

  2. 那么此时存在的问题就是,如果部署了多台消息服务实例,但是客户端只和其中一台实例建立了连接关系,那么业务究竟该调用哪台消息服务进行消息通知呢?且也不会为了实现此功能而麻烦到 RPC 调用到一台具体的实例,因为它一定是负载均衡的;

  3. 那么我们可以换个角度去想这件事,就是业务并不需要知道这个客户端具体在那台实例中,而是通知到所有的消息服务实例,由它们自己去判断这个客户端是不是自己持有,若持有则进行之后的操作,如果不持有那就不需要关注,按部就班做自己的事情即可;

  4. 那么此时就衍生出了一个词,叫做"广播模式",顾名思义就是需要一个生产者去发布一条消息,若干个消费者去消费这条消息,带入到上方的场景当中,就是说我需要一个生产者来发布消息,而消费者则是这些消息服务,生产者发布的消息模型我们是可以自己定义的,且要定义的通用一些,保证它的可扩展性,现在生产者和消费者都有了,那么什么时候去发布消息呢?在上面的场景中就是有业务方去触发生产者发布消息,消费者拿到消息后来判断当前要推送消息的客户端是否自己持有即可;

  5. 图解如下

    image.png

二、Demo 实现

一、说明
  1. 此实例使用 Redisson 实现,什么是 Redisson?Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson 提供了使用 Redis 的最简单和最便捷的方法。Redisson 的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
  2. 此实例只是简单的实例;
二、服务端
1        // 创建服务端
2        RTopic testListener = redisClient.getTopic("testListener");
3        // 消息发布
4        RFuture<Long> future = testListener.publishAsync("hello");
三、客户端
 1        RTopic testListener = redisClient.getTopic("testListener");
 2        // 异步监听
 3        testListener.addListenerAsync(String.class, new MessageListener<String>() {
 4            @Override
 5            public void onMessage(CharSequence channel, String msg) {
 6                // 监听到消息
 7                log.info(msg + ", RedisListener01");
 8            }
 9        });
10        log.info("....init RedisListener01 addListener.....");

三、优缺点

优点
  1. 轻量化,上手简单,快速接入;
  2. 支持模糊模式订阅,客户端可以订阅一个带*号的模式,如果推送的主题与这个模式匹配,那么订阅这个主题的客户端就会收到消息;
缺点
  1. 客户端执行订阅指令之后,就会进入订阅状态,之后就只能接受 subscribe、psubscribe、unsubscribe、punsubscribe 这四个命令;
  2. 新订阅的客户端,是无法收到这个频道之前的消息,这是因为 Redis 并不会对发布的消息持久化,相对于很多专业 MQ,比如 kafka、rocketmq、pulsar 等来说,redis 发布订阅功能就显得有点简陋了;
  3. 由于消息不会持久化,所以不建议在消息可靠性要求高的场景下使用,因为可能某次主备切换或者断线重连时很容易出现消息丢失的情况,这种情况就建议使用专业的 MQ,有离线消息上线推送的功能;
  4. 没有类似 ACK 的机制,生产者不保证消费者是否成功接收并处理此消息;

标题:Redis Redisson 发布/订阅模式 实现
作者:zzzzchen
地址:https://dczzs.com/articles/2022/11/24/1669300453144.html