Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

实战(二)

参考:

redis in action 学习笔记系列

redis in action

https://github.com/josiahcarlson/redis-in-action

Jedis

maven项目导入依赖

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.0.1</version>
</dependency>

发布订阅模式实践

通过Redis完成两个平台进行消息发布订阅的功能。

继承JedisPubSub类重载为自己的订阅方法

本例中我们实现一个订阅器RedisSubscriber,对发布时不进行重载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.maple.demo_for_springfox.test_for_jedis.service;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

/**
* @author Chenyufeng
* @title: RedisSubscriber
* @projectName demo_for_spring
* @description: TODO
* @date 2020/8/18 11:01
*/
public class RedisSubscriber extends JedisPubSub {
@Override
public void subscribe(String... channels) {
super.subscribe(channels);
}
@Override
// 当发起订阅的时候会执行如下方法
public void onSubscribe(String channel, int subscribedChannels) {
super.onSubscribe(channel, subscribedChannels);
System.out.println(String.format("订阅消息:频道%s , 频道数 %d", channel, subscribedChannels));
}
@Override
// 当有消息接收的时候会执行如下方法
public void onMessage(String channel, String message) {
super.onMessage(channel, message);
System.out.println(String.format("消费消息:频道%s ,消息:%s", channel, message));
}
}

启用订阅端线程

通过Jedis对象来进行订阅消息的时候,会阻塞等待,因此我们的采用开辟线程的方式来接收消息!

关于阻塞之处:Redis接收数据

1
2
3
Jedis jedis = jedisPool.getResource();
jedis.subscribe(redisSubscriber, "ADD_ORG");
jedis.close();

接收/订阅端线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class mSub implements Runnable {
private final JedisPool jedisPool;
private String channelRedis;
private RedisSubscriber redisSubscriber;

public mSub(JedisPool jedisPool, String channelRedis, RedisSubscriber subscriber) {
this.jedisPool = jedisPool;
this.channelRedis = channelRedis;
this.redisSubscriber = subscriber;
}

@Override
public void run() {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.subscribe(redisSubscriber, channelRedis);

} catch (Exception e) {
System.out.println("订阅失败!");
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}

消息发布

这里需注意,jedis2进行完发布之后需要进行close(),否则会因无法获得资源而阻塞。RedisPool资源定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    public static void main(String[] args) {
JedisPool jedisPool = new JedisPool("172.18.21.79");

RedisSubscriber redisSubscriber = new RedisSubscriber();

mSub sub = new TestJedis().new mSub(jedisPool, "ADD_ORG", redisSubscriber);
Thread thread = new Thread(sub);
thread.start();

Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
// System.out.println("mSub线程状态:" + thread.getState());
String msg = scanner.next();

System.out.println("new Message: " + msg);

Jedis jedis2 = jedisPool.getResource();
jedis2.publish("ADD_ORG", msg);
jedis2.close(); //不关闭会导致之后无法获得jedis资源。
}
}

附录:

客户端处理接收消息过程:

Jedis在订阅数据后客户端的处理流程如下:

List<Object> reply = client.getRawObjectMultiBulkReply();语句将会一直等待发布消息的到来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private void process(Client client) {

do {
List<Object> reply = client.getRawObjectMultiBulkReply();
final Object firstObj = reply.get(0);
if (!(firstObj instanceof byte[])) {
throw new JedisException("Unknown message type: " + firstObj);
}
final byte[] resp = (byte[]) firstObj;
if (Arrays.equals(SUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bchannel = (byte[]) reply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onSubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bchannel = (byte[]) reply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.raw, resp)) {
final byte[] bchannel = (byte[]) reply.get(1);
final byte[] bmesg = (byte[]) reply.get(2);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onMessage(strchannel, strmesg);
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final byte[] bchannel = (byte[]) reply.get(2);
final byte[] bmesg = (byte[]) reply.get(3);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onPMessage(strpattern, strchannel, strmesg);
} else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPSubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPUnsubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PONG.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPong(strpattern);
} else {
throw new JedisException("Unknown message type: " + firstObj);
}
} while (isSubscribed());

/* Invalidate instance since this thread is no longer listening */
this.client = null;
}

一直往源代码内部进入,可以看到阻塞读数据的过程

1597732365444

资源池配置:

RedisPool实例化过程

1597732998397

在RedisPool中可以定义资源的参数。默认参数GenericObjectPoolConfig如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* The default value for the {@code maxTotal} configuration attribute.
* @see GenericObjectPool#getMaxTotal()
*/
public static final int DEFAULT_MAX_TOTAL = 8;

/**
* The default value for the {@code maxIdle} configuration attribute.
* @see GenericObjectPool#getMaxIdle()
*/
public static final int DEFAULT_MAX_IDLE = 8;

/**
* The default value for the {@code minIdle} configuration attribute.
* @see GenericObjectPool#getMinIdle()
*/
public static final int DEFAULT_MIN_IDLE = 0;

最多能获得八个资源,如果申请了八个资源都没有close的话会导致阻塞。

评论