Spring中怎么实现响应式Redis交互
本文将模拟一个用户服务,并使用Redis作为数据存储服务器。涉及两个java bean,用户与权益
public class User {private long id;
private String name;
// 标签
private String label;
// 收货地址经度
private Double deliveryAddressLon;
// 收货地址维度
private Double deliveryAddressLat;
// 最新签到日
private String lastSigninDay;
// 积分
private Integer score;
// 权益
private List<
Rights>
rights;
...
}
public class Rights {
private Long id;
private Long userId;
private String name;
...
} 启动
引入依赖
<dependency>
<
groupId>
org.springframework.boot<
/groupId>
<
artifactId>
spring-boot-starter-data-redis-reactive<
/artifactId>
<
/dependency>
添加Redis配置
spring.redis.host=192.168.56.102spring.redis.port=6379
spring.redis.password=
spring.redis.timeout=5000
SpringBoot启动
@SpringBootApplicationpublic class UserServiceReactive {
public static void main(String[] args) {
new SpringApplicationBuilder(
UserServiceReactive.class)
.web(WebApplicationType.REACTIVE).run(args);
}
}
应用启动后,Spring会自动生成ReactiveRedisTemplate(它的底层框架是Lettuce)。ReactiveRedisTemplate与RedisTemplate使用类似,但它提供的是异步的,响应式Redis交互方式。这里再强调一下,响应式编程是异步的,ReactiveRedisTemplate发送Redis请求后不会阻塞线程,当前线程可以去执行其他任务。等到Redis响应数据返回后,ReactiveRedisTemplate再调度线程处理响应数据。响应式编程可以通过优雅的方式实现异步调用以及处理异步结果,正是它的最大的意义。
序列化ReactiveRedisTemplate默认使用的序列化是Jdk序列化,我们可以配置为json序列化
@Beanpublic RedisSerializationContext redisSerializationContext() {
RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
builder.key(StringRedisSerializer.UTF_8);
builder.value(RedisSerializer.json());
builder.hashKey(StringRedisSerializer.UTF_8);
builder.hashValue(StringRedisSerializer.UTF_8);
return builder.build();
}
@Bean
public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
RedisSerializationContext serializationContext = redisSerializationContext();
ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);
return reactiveRedisTemplate;
}
builder.hashValue方法指定Redis列表值的序列化方式,由于本文Redis列表值只存放字符串,所以还是设置为StringRedisSerializer.UTF_8。
基本数据类型ReactiveRedisTemplate支持Redis字符串,散列,列表,集合,有序集合等基本的数据类型。本文使用散列保存用户信息,列表保存用户权益,其他基本数据类型的使用本文不展开。
public Mono<Boolean>
save(User user) {
ReactiveHashOperations<
String, String, String>
opsForHash = redisTemplate.opsForHash();
Mono<
Boolean>
userRs = opsForHash.putAll("
user:"
+ user.getId(), beanToMap(user));
if(user.getRights() != null) {
ReactiveListOperations<
String, Rights>
opsForRights = redisTemplate.opsForList();
opsForRights.leftPushAll("
user:rights:"
+ user.getId(), user.getRights()).subscribe(l ->
{
logger.info("
add rights:{}"
, l);
});
}
return userRs;
}
beanToMap方法负责将User类转化为map。
HyperLogLogRedis HyperLogLog结构可以统计一个集合内不同元素的数量。使用HyperLogLog统计每天登录的用户量
public Mono<Long>
login(User user) {
ReactiveHyperLogLogOperations<
String, Long>
opsForHyperLogLog = redisTemplate.opsForHyperLogLog();
return opsForHyperLogLog.add("
user:login:number:"
+ LocalDateTime.now().toString().substring(0, 10), user.getId());
} BitMap
Redis BitMap(位图)通过一个Bit位表示某个元素对应的值或者状态。由于Bit是计算机存储中最小的单位,使用它进行储存将非常节省空间。使用BitMap记录用户本周是否有签到
public void addSignInFlag(long userId) {String key = "
user:signIn:"
+ LocalDateTime.now().getDayOfYear()/7 + (userId >
>
16);
redisTemplate.opsForValue().setBit(
key, userId &
0xffff , true)
.subscribe(b ->
logger.info("
set:{},result:{}"
, key, b));
}
userId高48位用于将用户划分到不同的key,低16位作为位图偏移参数offset。offset参数必须大于或等于0,小于2^32(bit 映射被限制在 512 MB 之内)。
GeoRedis Geo可以存储地理位置信息,并对地理位置进行计算。如查找给定范围内的仓库信息
public Flux getWarehouseInDist(User u, double dist) {ReactiveGeoOperations<
String, String>
geo = redisTemplate.opsForGeo();
Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);
RedisGeoCommands.GeoRadiusCommandArgs args =
RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();
return geo.radius("
warehouse:address"
, circle, args);
}
warehouse:address这个集合中需要先保存好仓库地理位置信息。ReactiveGeoOperations#radius方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。
LuaReactiveRedisTemplate也可以执行Lua脚本。下面通过Lua脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加1,如果用户今天已签到,则拒接操作。
public Flux<String>
addScore(long userId) {
DefaultRedisScript<
String>
script = new DefaultRedisScript<
>
();
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("
/signin.lua"
)));
List<
String>
keys = new ArrayList<
>
();
keys.add(String.valueOf(userId));
keys.add(LocalDateTime.now().toString().substring(0, 10));
return redisTemplate.execute(script, keys);
}
signin.lua内容如下
local score=redis.call('hget'
,'
user:'
..KEYS[1],'
score'
)
local day=redis.call('
hget'
,'
user:'
..KEYS[1],'
lastSigninDay'
)
if(day==KEYS[2])
then
return '
0'
else
redis.call('
hset'
,'
user:'
..KEYS[1],'
score'
, score+1,'
lastSigninDay'
,KEYS[2])
return '
1'
end Stream
Redis Stream 是 Redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。
Redis借鉴了kafka的设计,一个Stream内可以存在多个消费组,一个消费组内可以存在多个消费者。如果一个消费组内某个消费者消费了Stream中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。
下面定义一个Stream消费者,负责处理接收到的权益数据
@Componentpublic class RightsStreamConsumer implements ApplicationRunner, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);
@Autowired
private RedisConnectionFactory redisConnectionFactory;
private StreamMessageListenerContainer<
String, ObjectRecord<
String, Rights>
>
container;
// Stream队列
private static final String STREAM_KEY = "
stream:user:rights"
;
// 消费组
private static final String STREAM_GROUP = "
user-service"
;
// 消费者
private static final String STREAM_CONSUMER = "
consumer-1"
;
@Autowired
@Qualifier("
reactiveRedisTemplate"
)
private ReactiveRedisTemplate redisTemplate;
public void run(ApplicationArguments args) throws Exception {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<
String, ObjectRecord<
String, Rights>
>
options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(100) //一批次拉取的最大count数
.executor(Executors.newSingleThreadExecutor()) //线程池
.pollTimeout(Duration.ZERO) //阻塞式轮询
.targetType(Rights.class) //目标类型(消息内容的类型)
.build();
// 创建一个消息监听容器
container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
// prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream
prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)
.subscribe(stream ->
{
// 为Stream创建一个消费者,并绑定处理类
container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
new StreamMessageListener());
container.start();
});
}
@Override
public void destroy() throws Exception {
container.stop();
}
// 查找Stream信息,如果不存在,则创建Stream
private Mono<
StreamInfo.XInfoStream>
prepareStreamAndGroup(ReactiveStreamOperations<
String, ?, ?>
ops, String stream, String group) {
// info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。
return ops.info(stream).onErrorResume(err ->
{
logger.warn("
query stream err:{}"
, err.getMessage());
// createGroup方法创建Stream
return ops.createGroup(stream, group).flatMap(s ->
ops.info(stream));
});
}
// 消息处理对象
class StreamMessageListener implements StreamListener<
String, ObjectRecord<
String, Rights>
>
{
public void onMessage(ObjectRecord<
String, Rights>
message) {
// 处理消息
RecordId id = message.getId();
Rights rights = message.getValue();
logger.info("
receive id:{},rights:{}"
, id, rights);
redisTemplate.opsForList().leftPush("
user:rights:"
+ rights.getUserId(), rights).subscribe(l ->
{
logger.info("
add rights:{}"
, l);
});
}
}
}
下面看一下如何发送信息
public Mono<RecordId>
addRights(Rights r) {
String streamKey = "
stream:user:rights"
;
//stream key
ObjectRecord<
String, Rights>
record = ObjectRecord.create(streamKey, r);
Mono<
RecordId>
mono = redisTemplate.opsForStream().add(record);
return mono;
}
创建一个消息记录对象ObjectRecord,并通过ReactiveStreamOperations发送信息记录。
Sentinel、ClusterReactiveRedisTemplate也支持Redis Sentinel、Cluster集群模式,只需要调整配置即可。Sentinel配置如下
spring.redis.sentinel.master=mymasterspring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379
spring.redis.sentinel.password=
spring.redis.sentinel.nodes配置的是Sentinel节点IP地址和端口,不是Redis实例节点IP地址和端口。
Cluster配置如下
spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379spring.redis.lettuce.cluster.refresh.period=10000
spring.redis.lettuce.cluster.refresh.adaptive=true
如Redis Cluster中node2是node1的从节点,Lettuce中会缓存该信息,当node1宕机后,Redis Cluster会将node2升级为主节点。但Lettuce不会自动将请求切换到node2,因为它的缓冲没有刷新。开启spring.redis.lettuce.cluster.refresh.adaptive配置,Lettuce可以定时刷新Redis Cluster集群缓存信息,动态改变客户端的节点情况,完成故障转移。
暂时未发现ReactiveRedisTemplate实现pipeline,事务的方案。
概述:
Spring reactive技术的出现大大提高了Web应用程序的性能,但是如何与其他技术进行集成也是一个重要的问题。本文将介绍如何在Spring Boot应用中使用响应式Redis来提高系统性能。
1. 引入依赖
Spring提供了官方的响应式Redis驱动程序,只需在应用中添加以下依赖:
```
```
2. 创建ReactiveRedisTemplate
在使用响应式Redis时,需要使用ReactiveRedisTemplate而不是普通的RedisTemplate。可以通过以下代码创建:
```
@Configuration
public class RedisConfiguration {
@Bean
public ReactiveRedisConnectionFactory factory() {
return new LettuceConnectionFactory();
}
@Bean
public ReactiveRedisTemplate
return new ReactiveRedisTemplate<>(factory, RedisSerializationContext.string());
}
}
```
3. 增加响应式支持
默认情况下,RedisTemplate是阻塞式的,无法与响应式编程结合使用。要使用响应式Redis,需要调用reactiveConnection()方法来获得响应式连接:
```
Mono
ReactiveRedisCommands
```
4. 实现响应式缓存
在Spring Boot应用程序中使用响应式Redis时,可以实现响应式缓存来提高性能。在@EnableCaching注释上加上以下代码来启用响应式缓存支持:
```
@EnableCaching
public class AppConfig {
@Bean
public ReactiveCacheManager cacheManager(ReactiveRedisCacheWriter cacheWriter) {
RedisCacheConfiguration cacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(60));
return new RedisCacheManager(cacheWriter, cacheConfiguration);
}
@Bean
public ReactiveRedisCacheWriter cacheWriter(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisCacheWriter(factory);
}
}
```
5. 响应式数据访问
通过响应式Redis,我们可以实现响应式数据访问。以下代码展示了如何使用响应式Redis获取一条数据:
```
Mono
String value = valueMono.block();
```
6. 响应式列表操作
响应式Redis还支持响应式列表操作。以下代码演示了如何获取列表长度:
```
Mono
Long size = sizeMono.block();
```
7. 总结
通过使用Spring Boot中的响应式Redis,可以大大提高Web应用程序的性能。本文介绍了如何使用响应式Redis来实现响应式缓存和响应式数据访问,通过这些方法来提高系统的性能和响应能力。