创建监听器
package com.ssyouxia.listener;
/**
* Created by lianfangfang on 2019/2/28.
*/
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import java.util.concurrent.atomic.AtomicInteger;
public class RedisMessageListener implements MessageListener {
private AtomicInteger count = new AtomicInteger( 0 );
@Override
public void onMessage(Message message, byte[] pattern) {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
加载监听器
package com.ssyouxia.config;
import com.ssyouxia.listener.RedisMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
/**
* Created by lianfangfang on 2019/2/28.
*/
@Configuration
@Import(SpringDataRedisConfig.class)
public class RedisPubsubConfiguration {
@Bean
@Autowired
public RedisMessageListenerContainer container(
final JedisConnectionFactory connectionFactory) {
final RedisMessageListenerContainer container =
new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener(), new ChannelTopic("test-channel"));
return container;
}
@Bean
public MessageListener listener() {
return new RedisMessageListener();
}
}
测试类
package com.ssyouxia.config;
import com.ssyouxia.listener.RedisMessageListener;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.concurrent.Callable;
import static com.jayway.awaitility.Awaitility.await;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* Created by lianfangfang on 2019/2/28.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = RedisPubsubConfiguration.class)
public class RedisPublishSubscriberTestCase {
@Autowired
private RedisTemplate<String, String> template;
@Autowired
private RedisMessageListener listener;
@Test
public void testPublishSubscribe() {
assertThat(listener.getCount(), equalTo(0));
template.convertAndSend("test-channel", "Test Message 1!");
template.convertAndSend("test-channel", "Test Message 2!");
template.convertAndSend("test-channel", "Test Message 3!");
await().atMost(1, SECONDS).until(
new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return listener.getCount();
}
},
equalTo(3)
);
}
}
代码下载地址
https://gitee.com/liyuhang712/wsocketdemo