在Spring boot中,使用Redis来实现消息的发布与订阅,这里写了一个发布者和两个订阅者,发布者通过convertAndSend向不同的Channel中发布消息,订阅者通过RedisMessageListenerContainer充当消息侦听器容器,MessageListenerAdapter充当消息侦听适配器,将消息委托给MessageListener进行消费。
Publisher
1.pom.xml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
</dependencies>
2.application.yml1
2
3
4
5
6
7
8spring:
application:
name: redis-publisher
redis:
host: localhost
port: 6379
server:
port: 8080
3.RedisConfig1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class RedisConfig {
// 自定义RedisTemplate
"all") (
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
// 为了方便开发,一般直接使用<String,Object>
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
// Json序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
//String序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//key采取String的方式序列化
template.setKeySerializer(stringRedisSerializer);
// hash的key采用String方式序列化
template.setHashKeySerializer(stringRedisSerializer);
// value的序列化方式选择jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
4.Publisher1
2
3
4
5
6
7
8
9
10
11
12
public class Publisher {
private RedisTemplate redisTemplate;
public Publisher(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
// 向指定channel发送消息
public void publish(String channel, Object object) {
redisTemplate.convertAndSend(channel,object);
}
}
5.Test1
2
3
4
5
6
7
8
9
10
11
12 (classes = PublisherApplication.class)
public class Test1 {
private Publisher publisher;
public void test() {
System.out.println("发布消息");
publisher.publish("channel1","今天还好");
publisher.publish("channel2","来对a啊");
}
}
Subscriber
1.pom.xml1
2
3
4
5
6
7
8
9
10
11
12
13
14<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.3</version>
</dependency>
</dependencies>
2.application.yml1
2
3
4
5
6
7
8spring:
application:
name: redis-subscriber1
redis:
host: localhost
port: 6379
server:
port: 8081
3.RedisConfig1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class RedisConfig {
// 自定义RedisTemplate
"all") (
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
// 为了方便开发,一般直接使用<String,Object>
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
// Json序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
//String序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//key采取String的方式序列化
template.setKeySerializer(stringRedisSerializer);
// hash的key采用String方式序列化
template.setHashKeySerializer(stringRedisSerializer);
// value的序列化方式选择jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
4.MessageConfig1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Messageconfig {
private Subscriber subscriber;
private RedisTemplate redisTemplate;
public Messageconfig(Subscriber subscriber, RedisTemplate redisTemplate){
this.subscriber = subscriber;
this.redisTemplate = redisTemplate;
}
//RedisMessageListenerContainer充当消息侦听器容器。
RedisMessageListenerContainer container(MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisTemplate.getConnectionFactory());
List<Topic> topicList = new ArrayList<Topic>();
topicList.add(new PatternTopic("channel1"));
container.addMessageListener(listenerAdapter,topicList);
return container;
}
//消息侦听器适配器,能将消息委托给目标侦听器方法
MessageListenerAdapter listenerAdapter() {
return new MessageListenerAdapter(subscriber);
}
}
5.Subcriber1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Subscriber implements MessageListener {
private RedisTemplate redisTemplate;
public Subscriber(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
// 每次新消息到达时,都会调用
public void onMessage(Message message, byte[] bytes) {
RedisSerializer<?> keySerializer = redisTemplate.getKeySerializer();
RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
Object channel = keySerializer.deserialize(message.getChannel());
Object body = valueSerializer.deserialize(message.getBody());
System.out.println("Channel:"+ channel);
System.out.println("消息:"+String.valueOf(body));
}
}
另一个Subscriber与上面的大致一致,除了监听的channel不一致