参考:
redis in action 学习笔记系列
redis in action
https://github.com/josiahcarlson/redis-in-action
Jedis maven项目导入依赖
1 2 3 4 5 6 <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > <version > 3.0.1</version > </dependency >
发布订阅模式实践 通过Redis完成两个平台进行消息发布订阅的功能。
继承JedisPubSub类重载为自己的订阅方法
本例中我们实现一个订阅器RedisSubscriber
,对发布时不进行重载
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.maple.demo_for_springfox.test_for_jedis.service;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPubSub;public class RedisSubscriber extends JedisPubSub { @Override public void subscribe (String... channels) { super .subscribe(channels); } @Override public void onSubscribe (String channel, int subscribedChannels) { super .onSubscribe(channel, subscribedChannels); System.out.println(String.format("订阅消息:频道%s , 频道数 %d" , channel, subscribedChannels)); } @Override public void onMessage (String channel, String message) { super .onMessage(channel, message); System.out.println(String.format("消费消息:频道%s ,消息:%s" , channel, message)); } }
启用订阅端线程 通过Jedis对象来进行订阅消息的时候,会阻塞等待,因此我们的采用开辟线程的方式来接收消息!
关于阻塞之处:Redis接收数据
1 2 3 Jedis jedis = jedisPool.getResource(); jedis.subscribe(redisSubscriber, "ADD_ORG" ); jedis.close();
接收/订阅端线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class mSub implements Runnable { private final JedisPool jedisPool; private String channelRedis; private RedisSubscriber redisSubscriber; public mSub (JedisPool jedisPool, String channelRedis, RedisSubscriber subscriber) { this .jedisPool = jedisPool; this .channelRedis = channelRedis; this .redisSubscriber = subscriber; } @Override public void run () { Jedis jedis = null ; try { jedis = jedisPool.getResource(); jedis.subscribe(redisSubscriber, channelRedis); } catch (Exception e) { System.out.println("订阅失败!" ); } finally { if (jedis != null ) { jedis.close(); } } } }
消息发布
这里需注意,jedis2进行完发布之后需要进行close(),否则会因无法获得资源而阻塞。RedisPool资源定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) { JedisPool jedisPool = new JedisPool("172.18.21.79" ); RedisSubscriber redisSubscriber = new RedisSubscriber(); mSub sub = new TestJedis().new mSub (jedisPool, "ADD_ORG" , redisSubscriber) ; Thread thread = new Thread(sub); thread.start(); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); System.out.println("new Message: " + msg); Jedis jedis2 = jedisPool.getResource(); jedis2.publish("ADD_ORG" , msg); jedis2.close(); } }
附录: 客户端处理接收消息过程:
Jedis在订阅数据后客户端的处理流程如下:
在 List<Object> reply = client.getRawObjectMultiBulkReply();
语句将会一直等待发布消息的到来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 private void process (Client client) { do { List<Object> reply = client.getRawObjectMultiBulkReply(); final Object firstObj = reply.get(0 ); if (!(firstObj instanceof byte [])) { throw new JedisException("Unknown message type: " + firstObj); } final byte [] resp = (byte []) firstObj; if (Arrays.equals(SUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2 )).intValue(); final byte [] bchannel = (byte []) reply.get(1 ); final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel); onSubscribe(strchannel, subscribedChannels); } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2 )).intValue(); final byte [] bchannel = (byte []) reply.get(1 ); final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel); onUnsubscribe(strchannel, subscribedChannels); } else if (Arrays.equals(MESSAGE.raw, resp)) { final byte [] bchannel = (byte []) reply.get(1 ); final byte [] bmesg = (byte []) reply.get(2 ); final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel); final String strmesg = (bmesg == null ) ? null : SafeEncoder.encode(bmesg); onMessage(strchannel, strmesg); } else if (Arrays.equals(PMESSAGE.raw, resp)) { final byte [] bpattern = (byte []) reply.get(1 ); final byte [] bchannel = (byte []) reply.get(2 ); final byte [] bmesg = (byte []) reply.get(3 ); final String strpattern = (bpattern == null ) ? null : SafeEncoder.encode(bpattern); final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel); final String strmesg = (bmesg == null ) ? null : SafeEncoder.encode(bmesg); onPMessage(strpattern, strchannel, strmesg); } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2 )).intValue(); final byte [] bpattern = (byte []) reply.get(1 ); final String strpattern = (bpattern == null ) ? null : SafeEncoder.encode(bpattern); onPSubscribe(strpattern, subscribedChannels); } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) { subscribedChannels = ((Long) reply.get(2 )).intValue(); final byte [] bpattern = (byte []) reply.get(1 ); final String strpattern = (bpattern == null ) ? null : SafeEncoder.encode(bpattern); onPUnsubscribe(strpattern, subscribedChannels); } else if (Arrays.equals(PONG.raw, resp)) { final byte [] bpattern = (byte []) reply.get(1 ); final String strpattern = (bpattern == null ) ? null : SafeEncoder.encode(bpattern); onPong(strpattern); } else { throw new JedisException("Unknown message type: " + firstObj); } } while (isSubscribed()); this .client = null ; }
一直往源代码内部进入,可以看到阻塞读数据的过程
资源池配置:
RedisPool实例化过程
在RedisPool中可以定义资源的参数。默认参数GenericObjectPoolConfig
如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static final int DEFAULT_MAX_TOTAL = 8 ;public static final int DEFAULT_MAX_IDLE = 8 ;public static final int DEFAULT_MIN_IDLE = 0 ;
最多能获得八个资源,如果申请了八个资源都没有close的话会导致阻塞。