- 以一个黑马点评项目作为 Redis 的实战篇内容
- 黑马点评半成品代码地址:https://gitcode.net/Drifter_Galaxy/hmdp.git
- 其中的
hm-dianping.zip
(后端)、nginx-1.18.0.zip
(里面有前端资源)、hmdp.sql
文件是我们需要的。- 具体如何导入该项目,见视频
P25
二、实战篇
以一个黑马点评项目作为 Redis 的实战篇内容:
- 黑马点评半成品代码地址:https://gitcode.net/Drifter_Galaxy/hmdp.git
- 其中的
hm-dianping.zip
(后端)、nginx-1.18.0.zip
(里面有前端资源)、hmdp.sql
文件是我们需要的。 - 具体如何导入该项目,见视频
P25
- 其中的
- 黑马点评半成品代码地址:https://gitcode.net/Drifter_Galaxy/hmdp.git
2.1 短信登录
- 短信登录的总流程:
2.1.1 发送短信验证码
发送短信验证码的流程:
/api
的作用:- 在 Web 开发中,使用路径中包含
/api
的约定是一种常见的做法,通常用于区分前端页面的路径和后端 API 的路径。
- 在 Web 开发中,使用路径中包含
根据短信登录总流程的第一个图示进行业务代码的编写(从接到前端请求开始往后端写):
1
2
3
4
5
6
7
8/**
* 发送手机验证码
*/
public Result sendCode( { String phone, HttpSession session)
// 发送短信验证码并保存验证码
return userService.sendCode(phone, session);
}1
2
3public interface IUserService extends IService<User> {
Result sendCode(String phone, HttpSession session);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
public Result sendCode(String phone, HttpSession session) {
// 1.校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 手机号不符合,返回错误信息
return Result.fail("手机号有误,请重试");
}
// 2.生成验证码
String code = RandomUtil.randomNumbers(6);
// 3.将手机号验证码保存到session
session.setAttribute("phone", phone);
session.setAttribute("code", code);
log.info("手机号验证码为:{}", code);
// 4.发送验证码
return Result.ok();
}
}
2.1.2 短信验证码登录
短信验证码登录的流程:
根据短信登录总流程的第二个图示进行业务代码的编写:
1
2
3
4
5
6
7
8
9
10/**
* 登录功能
*
* @param loginForm 登录参数,包含手机号、验证码;或者手机号、密码
*/
public Result login( { LoginFormDTO loginForm, HttpSession session)
// 实现登录功能
return userService.login(loginForm, session);
}1
2
3
4public interface IUserService extends IService<User> {
...
Result login(LoginFormDTO loginForm, HttpSession session);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1.校验手机号,防止用户在得到短信验证码后修改手机号
String phone = loginForm.getPhone();
Object cachePhone = session.getAttribute("phone");
if (cachePhone == null) {
return Result.fail("手机号过期,请重试");
}
if (!phone.equals(cachePhone.toString())) {
// 前后手机号不一致
return Result.fail("获取验证码的手机号和当前手机号不一致");
}
// 2.校验验证码
String code = loginForm.getCode();
Object cacheCode = session.getAttribute("code");
if (cacheCode == null) {
return Result.fail("验证码过期,请重试");
}
if (!code.equals(cacheCode.toString())) {
// 验证码有误
return Result.fail("验证码有误,请重试");
}
// 3.根据手机号查询用户
User user = query().eq("phone", cachePhone.toString()).one();
if (user == null) {
// 用户不存在则保存新用户数据到数据库中
user = createUserWithPhone(cachePhone.toString());
save(user);
}
// 4.保存用户信息到session中
session.setAttribute("user", user);
return Result.ok();
}
private User createUserWithPhone(String phone) {
User user = new User();
user.setPhone(phone);
user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomNumbers(10));
return user;
}
}
2.1.3 校验登录状态 - 拦截器
因为用户需要先登录,才能对点评 app 的大多数功能进行操作,所以需要一个登录校验拦截器来帮助我们校验用户的登录状态,这样就不需要在每个
Controller
中都校验用户的登录状态了。另外,XxxController
在实现功能操作时有可能需要用户的信息,所以我们要把用户的信息保存在ThreadLocal
中。根据短信登录总流程的第三个图示进行业务代码的编写:
登录校验拦截器的实现:
创建登录校验拦截器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40package com.f.utils;
import com.f.dto.UserDTO;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.servlet.http.HttpSession;
import org.springframework.web.servlet.HandlerInterceptor;
/**
* @author fzy
* @date 2024/3/6 21:18
*/
public class LoginInterceptor implements HandlerInterceptor {
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) throws Exception {
// 1.获取session
HttpSession session = request.getSession();
// 2.获取session中的用户
UserDTO user = (UserDTO) session.getAttribute("user");
// 3.判断用户是否存在
if (user == null) {
// 如果用户不存在,则拦截,返回 401 状态码
response.setStatus(401);
return false;
}
// 如果用户存在,就将用户保存在ThreadLocal中
UserHolder.saveUser(user);
return true;
}
public void afterCompletion(HttpServletRequest request,
HttpServletResponse response,
Object handler, Exception ex) throws Exception {
// 移除用户,避免内存泄漏
UserHolder.removeUser();
}
}- 注意:在将用户信息保存在
ThreadLocal
中时,并没有将User
对象保存在ThreadLocal
中,而是将UserDTO
保存在ThreadLocal
中(UserDTO
中仅有User
的部分属性),目的是为了隐藏用户的敏感信息。
- 注意:在将用户信息保存在
注册拦截器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29package com.f.config;
import com.f.utils.LoginInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* springmvc相关的配置
* @author fzy
* @date 2024/3/6 21:25
*/
public class MvcConfig implements WebMvcConfigurer {
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
// 指定以下这些路径不拦截
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/upload/**",
"/voucher/**"
);
}
}
1
2
3
4
5
6
public Result me() {
// 获取当前登录的用户并返回
UserDTO user = UserHolder.getUser();
return Result.ok(user);
}
2.1.4 服务器集群的session共享问题
session
共享问题:多台 Tomcat 并不共享 session 存储空间,当请求切换到不同 tomcat 服务时导致数据丢失的问题。
★基于Redis实现共享session登录
既然要使用 Redis 来存储用户的信息,那就要考虑用什么样的数据结构来存储:
- 对于验证码,考虑使用
String
类型,并且key
是手机号,value
是验证码。 - 对于用户信息,考虑使用
Hash
类型,并且key
是随机 token(不使用手机号作为key
是为了安全,因为我们要将用户信息的key
保存在浏览器内存中)。
- 对于验证码,考虑使用
接下来就基于
2.1.1 - 2.1.3
小节的代码,进行修改。发送短信验证码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
private StringRedisTemplate stringRedisTemplate;
public Result sendCode(String phone, HttpSession session) {
// 1.校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 手机号不符合,返回错误信息
return Result.fail("手机号有误,请重试");
}
// 2.生成验证码
String code = RandomUtil.randomNumbers(6);
// 3.将手机号验证码保存到redis,key是手机号,value是验证码,并设置有效期为两分钟
stringRedisTemplate.opsForValue().set(RedisConstants.LOGIN_CODE_KEY + phone, code, Duration.ofMinutes(RedisConstants.LOGIN_CODE_TTL));
log.info("手机号验证码为:{}", code);
// 4.发送验证码
return Result.ok();
}
......
}短信验证码登录:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
private StringRedisTemplate stringRedisTemplate;
......
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1.校验验证码
String phone = loginForm.getPhone();
String code = loginForm.getCode();
String cacheCode = stringRedisTemplate.opsForValue().get(RedisConstants.LOGIN_CODE_KEY + phone);
if (cacheCode == null) {
// 验证码过期
return Result.fail("验证码过期,请重试");
}
if (!code.equals(cacheCode)) {
// 验证码有误
return Result.fail("验证码有误,请重试");
}
// 2.根据手机号查询用户
User user = query().eq("phone", phone).one();
if (user == null) {
// 用户不存在则保存新用户数据到数据库中
user = createUserWithPhone(phone);
save(user);
}
// 3.保存用户信息到redis中
// 3.1 随机生成token,作为登录令牌
String token = UUID.randomUUID().toString(true);
// 3.2 将用户信息用Hash类型存储在redis中,key是token,value是用户信息,并设置有效期为30分钟
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
stringRedisTemplate.opsForHash().putAll(RedisConstants.LOGIN_USER_KEY + token, BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create().setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())));
stringRedisTemplate.expire(RedisConstants.LOGIN_USER_KEY + token, Duration.ofMinutes(RedisConstants.LOGIN_USER_TTL));
// 4.返回token给前端
return Result.ok(token);
}
private User createUserWithPhone(String phone) {
User user = new User();
user.setPhone(phone);
user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomNumbers(10));
return user;
}
}校验登录状态:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57package com.f.utils;
import cn.hutool.core.bean.BeanUtil;
import com.f.dto.UserDTO;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.HandlerInterceptor;
import java.time.Duration;
import java.util.Map;
/**
* @author fzy
* @date 2024/3/6 21:18
*/
public class LoginInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;
public LoginInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) throws Exception {
// 1.获取请求头中的token
String token = request.getHeader("authorization");
if (token == null) {
// 如果token不存在,则拦截,返回 401 状态码
response.setStatus(401);
return false;
}
// 2.根据token获取redis中的用户
Map userMap = stringRedisTemplate.opsForHash().entries(RedisConstants.LOGIN_USER_KEY + token);
// 3.判断用户是否存在
if (userMap.isEmpty()) {
// 如果用户不存在,则拦截,返回 401 状态码
response.setStatus(401);
return false;
}
// 如果用户存在,就刷新token的有效期,并将用户保存在ThreadLocal中
UserDTO user = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
UserHolder.saveUser(user);
stringRedisTemplate.expire(RedisConstants.LOGIN_USER_KEY + token, Duration.ofMinutes(RedisConstants.LOGIN_USER_TTL));
return true;
}
public void afterCompletion(HttpServletRequest request,
HttpServletResponse response,
Object handler, Exception ex) throws Exception {
// 移除用户,避免内存泄漏
UserHolder.removeUser();
}
}- 注意:如果用户一直在操作,就要一直刷新 token 的有效期。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35package com.f.config;
import com.f.utils.LoginInterceptor;
import jakarta.annotation.Resource;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* springmvc相关的配置
*
* @author fzy
* @date 2024/3/6 21:25
*/
public class MvcConfig implements WebMvcConfigurer {
private StringRedisTemplate stringRedisTemplate;
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
.excludePathPatterns(
// 指定以下这些路径不拦截
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/upload/**",
"/voucher/**"
);
}
}MvcConfig
注入StringRedisTemplate
,然后传给LoginInterceptor
,因为LoginInterceptor
不是 bean,不能注入其他 bean。
登录拦截器的优化
目前登录拦截器的业务逻辑是:会被拦截的用户请求进入拦截器,进行
preHandle
操作,在preHandle
中对 redis 中的 token 进行有效期刷新。但是,如果用户在登录后,一直访问不需要拦截的路径,那 redis 中的 token 的有效期就一直不被刷新,最终 token 失效,用户还要重新登录。所以我们需要对登录拦截器进行优化:在这个拦截器前面再加个拦截器,然后在新增拦截器上进行保存用户信息到
ThreadLocal
和刷新 token 有效期的操作。创建拦截器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55package com.f.utils;
import cn.hutool.core.bean.BeanUtil;
import com.f.dto.UserDTO;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.HandlerInterceptor;
import java.time.Duration;
import java.util.Map;
/**
* @author fzy
* @date 2024/3/7 15:53
*/
public class RefreshTokenInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;
public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) throws Exception {
// 1.获取请求头中的token
String token = request.getHeader("authorization");
if (token == null) {
// 如果token不存在,直接放行,由后续的拦截器进行拦截
return true;
}
// 2.根据token获取redis中的用户
Map userMap = stringRedisTemplate.opsForHash().entries(RedisConstants.LOGIN_USER_KEY + token);
// 3.判断用户是否存在
if (userMap.isEmpty()) {
// 如果用户不存在,直接放行,由后续的拦截器进行拦截
return true;
}
// 如果用户存在,就刷新token的有效期,并将用户保存在ThreadLocal中
UserDTO user = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
UserHolder.saveUser(user);
stringRedisTemplate.expire(RedisConstants.LOGIN_USER_KEY + token, Duration.ofMinutes(RedisConstants.LOGIN_USER_TTL));
return true;
}
public void afterCompletion(HttpServletRequest request,
HttpServletResponse response,
Object handler, Exception ex) throws Exception {
// 移除用户,避免内存泄漏
UserHolder.removeUser();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25package com.f.utils;
import com.f.dto.UserDTO;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.web.servlet.HandlerInterceptor;
/**
* @author fzy
* @date 2024/3/6 21:18
*/
public class LoginInterceptor implements HandlerInterceptor {
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) throws Exception {
UserDTO user = UserHolder.getUser();
if (user == null) {
// 如果用户不存在,则拦截,返回 401 状态码
response.setStatus(401);
return false;
}
return true;
}
}注册拦截器并指定顺序(
order
越小优先级越高):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.f.config;
import com.f.utils.LoginInterceptor;
import com.f.utils.RefreshTokenInterceptor;
import jakarta.annotation.Resource;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* springmvc相关的配置
*
* @author fzy
* @date 2024/3/6 21:25
*/
public class MvcConfig implements WebMvcConfigurer {
private StringRedisTemplate stringRedisTemplate;
public void addInterceptors(InterceptorRegistry registry) {
// token刷新拦截器
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate))
// 拦截所有请求
.addPathPatterns("/**")
.order(0);
// 登录拦截器
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
// 指定以下这些路径不拦截
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/upload/**",
"/voucher/**"
)
.order(1);
}
}
2.2 ★★★商户查询缓存
- 缓存:就是数据交换的缓冲区(称作Cache),是存贮数据的临时地方,一般读写性能较高。
- 由于 Redis 基于内存,具有低延迟,速度快的特征,所以 Redis 很适合用作缓存。
2.2.1 添加商户缓存
添加商户缓存的流程:
代码如下:
1
2
3
4
5
6
7
8
9
10/**
* 根据id查询商铺信息
*
* @param id 商铺id
* @return 商铺详情数据
*/
public Result queryShopById( { Long id)
return shopService.queryShopById(id);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14package com.f.service;
import com.f.dto.Result;
import com.f.pojo.Shop;
import com.baomidou.mybatisplus.extension.service.IService;
/**
* <p>
* 服务类
* </p>
*/
public interface IShopService extends IService<Shop> {
Result queryShopById(Long id);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46package com.f.service.impl;
import cn.hutool.json.JSONUtil;
import com.f.dto.Result;
import com.f.pojo.Shop;
import com.f.mapper.ShopMapper;
import com.f.service.IShopService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.f.utils.RedisConstants;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
* <p>
* 服务实现类
* </p>
*/
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
private StringRedisTemplate stringRedisTemplate;
public Result queryShopById(Long id) {
// 1.从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
// 2.判断商铺缓存是否存在
if (shopJson != null) {
// 3.存在则直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 4.不存在则从数据库中查询商铺信息
Shop shop = getById(id);
// 5.判断商铺信息是否存在
if (shop == null) {
// 6.不存在则返回错误信息
return Result.fail("店铺不存在");
}
// 7.存在则将商铺信息写入redis,并返回商铺信息
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + shop.getId(),
JSONUtil.toJsonStr(shop), Duration.ofMinutes(RedisConstants.CACHE_SHOP_TTL));
return Result.ok(shop);
}
}- 经过测试,在使用缓存之前,查询耗时 92ms;在使用缓存之后,查询耗时 11ms,大大加快了查询效率。
2.2.2 ★缓存更新策略
如果数据库中的数据发生改变,而 Redis 还没及时更新相关数据,那么从 Redis 缓存中取到的数据就是过时的数据,就存在数据一致性问题。为了解决这个问题,我们需要使用缓存更新策略:
- 内存淘汰
- 超时剔除
- 主动更新
★主动更新
主动更新策略需要考虑下面几个问题:
- 删除缓存还是更新缓存?
- 删除缓存,查询时再更新缓存,避免无效的写操作。
- 如何保证缓存与数据库的操作同时成功或失败?
- 先操作缓存还是先操作数据库?
- 由于操作数据库的速度比操作缓存的速度慢,所以操作缓存的时候有极低概率会被操作数据库的线程抢占cpu,出现线程安全问题,而反过来出现线程安全问题的概率更大,所以先操作数据库再操作缓存。
- 视频
P38
有动画演示,不明白的话可以看一下。
- 视频
- 由于操作数据库的速度比操作缓存的速度慢,所以操作缓存的时候有极低概率会被操作数据库的线程抢占cpu,出现线程安全问题,而反过来出现线程安全问题的概率更大,所以先操作数据库再操作缓存。
- 删除缓存还是更新缓存?
总结:缓存更新策略的最佳实践方案:
- 低一致性需求:使用 Redis 自带的内存淘汰机制。
- 高一致性需求:主动更新,并以超时剔除作为兜底方案:
- 读操作:
- 缓存命中则返回。
- 缓存未命中则查询数据库,并写入缓存,设定超时时间。
- 写操作:
- 先写数据库,然后再删除缓存。
- 要确保数据库与缓存操作的原子性。
- 读操作:
2.2.3 实现商铺缓存与数据库的双写一致
在更新商铺信息的方法中,实现前面所说的:
- 先写数据库,然后再删除缓存。
- 确保数据库与缓存操作的原子性。
1
2
3
4
5
6
7
8
9
10/**
* 更新商铺信息
*
* @param shop 商铺数据
*/
public Result updateShop( { Shop shop)
// 更新商铺,包括操作数据库和操作缓存
return shopService.updateShopById(shop);
}1
2
3
4public interface IShopService extends IService<Shop> {
......
Result updateShopById(Shop shop);
}1
2
3
4
5
6
7
8
9
10
11
12// 事务保证原子性操作
public Result updateShopById(Shop shop) {
Long id = shop.getId();
if (id == null) {
return Result.fail("商铺不存在,无法更新");
}
// 1.更新数据库
updateById(shop);
// 2.删除缓存
stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + id);
return Result.ok();
}
2.2.4 ★缓存穿透
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
缓存穿透的解决思路:
缓存空对象:
- 当用户请求的数据在缓存和数据库中都不存在时,为了避免用户不断请求该数据,给数据库造成太大压力,所以在 Redis 缓存中缓存
null
,这样用户再次重复请求后,得到的就是缓存中的null
。
- 当用户请求的数据在缓存和数据库中都不存在时,为了避免用户不断请求该数据,给数据库造成太大压力,所以在 Redis 缓存中缓存
布隆过滤:
布隆过滤器(Bloom Filter)是一种空间高效的数据结构,主要用于判断一个元素是否属于一个集合,而不存储具体的元素内容。它适用于那些需要判断大量元素是否存在的场景,如缓存、数据去重等。
布隆过滤器说数据不存在,那是一定不存在,因为如果元素存在,那么其哈希函数对应的位置应该为 1。布隆过滤器说数据存在,数据未必存在,因为不同元素经过哈希函数映射到位数组时可能发生冲突,导致多个元素的多个位置相同。
- 通过在客户端和 Redis 之间增加一层布隆过滤器来解决缓存穿透问题。不过还是存在一定的缓存穿透风险,原因如上,布隆过滤器说数据存在,数据未必存在。
在考虑了缓存穿透问题后,对查询商铺的代码进行修改:
- 增加了缓存空对象以及判断缓存内容是否为
""
的逻辑代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public Result queryShopById(Long id) {
// 1.从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
// 2.判断商铺缓存是否存在且不为空
if (shopJson != null && !shopJson.equals("")) {
// 3.存在则直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
if (shopJson != null) { // 表示商铺信息是空值,用于解决缓存穿透问题
return Result.fail("商铺信息不存在");
}
// 4.不存在则从数据库中查询商铺信息
Shop shop = getById(id);
// 5.判断商铺信息是否存在
if (shop == null) {
// 6.不存在则将空值写入redis,并返回错误信息
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id,
"", Duration.ofMinutes(RedisConstants.CACHE_NULL_TTL));
return Result.fail("店铺不存在");
}
// 7.存在则将商铺信息写入redis,并返回商铺信息
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + shop.getId(),
JSONUtil.toJsonStr(shop), Duration.ofMinutes(RedisConstants.CACHE_SHOP_TTL));
return Result.ok(shop);
}- 增加了缓存空对象以及判断缓存内容是否为
除了被动的解决缓存穿透问题,也可以主动去避免出现缓存穿透问题,例如:
- 增强 id 的复杂度,避免被猜测 id 规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
2.2.5 ★缓存雪崩
- 缓存雪崩是指在同一时段大量的缓存
key
同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。 - 缓存雪崩的解决思路:
- 给不同的
key
的TTL
添加随机值。 - 利用 Redis 集群提高服务的可用性。
- 给缓存业务添加降级限流策略,当 Redis 故障时,直接拒绝服务,避免将大量请求送到数据库,虽然降低了用户体验,但是保证了数据库的健康。
- 给业务添加多级缓存,如浏览器缓存、Nginx缓存、Redis缓存、jvm内部缓存等。
- 给不同的
2.2.6 ★缓存击穿
缓存击穿问题也叫热点
key
问题,就是一个被高并发访问并且缓存重建业务较复杂的key
突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。缓存击穿的解决方案:
- 互斥锁:查询缓存未命中,获取互斥锁,获取到互斥锁的才能查询数据库重建缓存,将数据写入缓存中后,释放锁。
- 逻辑过期:查询缓存,发现逻辑时间已经过期,获取互斥锁,开启新线程;在新线程中查询数据库重建缓存,将数据写入缓存中后,释放锁;在释放锁之前,查询该数据时,都会将过期的数据返回。
解决方案 优点 缺点 互斥锁 没有额外的内存消耗;保证一致性;实现简单 线程需要等待,性能受影响;可能有死锁风险 逻辑过期 线程无需等待,性能较好 有额外内存消耗;不保证一致性;实现复杂
基于互斥锁方式解决缓存击穿问题
核心:利用 Redis 的
setnx
方法来表示获取锁。该方法的含义是:如果 Redis 中没有这个 Key,则插入成功;如果有这个 Key,则插入失败。通过插入成功或失败来表示是否有线程插入 Key,插入成功的 Key 则认为是获取到锁的线程;释放锁就是将这个 Key 删除,因为删除 Key 以后其他线程才能再执行setnx
方法。互斥锁方式解决缓存击穿问题代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44// 在缓存穿透的基础上用互斥锁解决缓存击穿问题
public Shop queryShopWithMutex(Long id) {
// 1.从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
// 2.判断商铺缓存是否存在且不为空
if (shopJson != null && !shopJson.equals("")) {
// 3.存在则直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return shop;
}
if (shopJson != null) { // 表示商铺信息是空值,用于解决缓存穿透问题
return null;
}
Shop shop = null;
try {
// 4.不存在则尝试获取互斥锁
boolean isLock = tryLock(RedisConstants.LOCK_SHOP_KEY + id);
// 5.判断是否获取互斥锁
if (!isLock) {
// 6.获取互斥锁失败,休眠并重试
Thread.sleep(50);
return queryShopWithMutex(id);
}
// 7.获取互斥锁成功,去数据库查询商铺信息
shop = getById(id);
// 模拟缓存重建的延时
Thread.sleep(200);
// 8.判断商铺信息是否存在
if (shop == null) {
// 9.不存在则将空值写入redis,用于解决缓存穿透问题
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id,
"", Duration.ofMinutes(RedisConstants.CACHE_NULL_TTL));
return null;
}
// 9.存在则将商铺信息写入redis,并在finally中释放互斥锁
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + shop.getId(),
JSONUtil.toJsonStr(shop), Duration.ofMinutes(RedisConstants.CACHE_SHOP_TTL));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stringRedisTemplate.delete(RedisConstants.LOCK_SHOP_KEY + id);
}
return shop;
}1
2
3
4
5
6// 尝试获取锁,通过是否成功插入key来判断是否获取锁
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1",
Duration.ofSeconds(RedisConstants.LOCK_SHOP_TTL));
return flag;
}1
2
3
4// 删除锁
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
基于逻辑过期方式解决缓存击穿问题
既然要对
Shop
对象增加一个逻辑过期时间,那就需要增加Shop
类的属性,这违反了OCP
原则,为此,新建一个RedisData
类,用于逻辑过期时间的增加。1
2
3
4
5
6
7
8
9
10
11package com.f.utils;
import lombok.Data;
import java.time.LocalDateTime;
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}将店铺数据预设到缓存中:
1
2
3
4
5
6
7
8
9
10
11
12// 将店铺数据预设到缓存中
public void saveShop2Redis(Long id, Long expireTime) {
// 1.查询店铺数据
Shop shop = getById(id);
// 2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusDays(expireTime));
// 3.写入redis缓存,没有设置redis中的ttl
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id,
JSONUtil.toJsonStr(redisData));
}逻辑过期方式解决缓存击穿问题代码实现:
- 不考虑缓存穿透问题,因为缓存是人为预设的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40// 线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
// 用逻辑过期解决缓存击穿问题(不考虑缓存穿透)
public Shop queryShopWithLogicalExpire(Long id) {
// 1.从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
// 2.判断商铺缓存是否命中
if (shopJson == null) {
// 3.未命中则返回空
return null;
}
// 4.命中则再判断商铺缓存是否过期
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断商铺缓存是否过期
if (expireTime.isAfter(LocalDateTime.now())) {
// 6.商铺缓存未过期,直接返回
return shop;
}
// 7.商铺缓存过期,尝试获取互斥锁
boolean isLock = tryLock(RedisConstants.LOCK_SHOP_KEY + id);
// 8.判断是否获取互斥锁
if (isLock) {
// 9.如果获取互斥锁,新开一个线程用于更新商铺缓存
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
saveShop2Redis(id, RedisConstants.SHOP_EXPIRE_TTL);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放互斥锁
unlock(RedisConstants.LOCK_SHOP_KEY + id);
}
});
}
// 10.不论是否获取互斥锁,都返回过时的商铺信息
return shop;
}
总的来说,缓存穿透、缓存雪崩、缓存击穿最终带来的问题都是给数据库带来了巨大的请求压力,可能会导致数据库的崩溃,所以需要采取相关措施来解决这些缓存问题。
2.3 优惠券秒杀
2.3.1 全局唯一ID
全局 ID 生成器,是一种在分布式系统下用来生成全局唯一 ID 的工具,一般要满足下列特性:
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性
而 Redis 恰好能满足以上大部分特性,除了安全性需要另外做一些处理。
为了 ID 的安全性,不直接使用 Redis 自增的数值,而是拼接一些其它信息:
- 符号位:1bit,永远为0,表示正数
- 时间戳:31bit,以秒为单位
- 序列号:32bit,秒内的计数器,支持每秒产生 2^32 个不同 ID
全局 ID 生成器生成全局唯一 ID 的实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39package com.f.utils;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
/**
* @author fzy
* @date 2024/3/8 16:11
*/
public class RedisIdWorker {
// 定义初始时间戳,以2000年1月1日为基准
private static final long BEGIN_TIMESTAMP = LocalDateTime
.of(2000, 1, 1, 0, 0, 0)
.toEpochSecond(ZoneOffset.UTC);
// 时间戳左移位数
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
// keyPrefix用于区分不同的业务
public long nextId(String keyPrefix) {
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long timeStamp = now.toEpochSecond(ZoneOffset.UTC) - BEGIN_TIMESTAMP;
// 2.生成序列号(用redis的自增长)
// 获取当前日期(精确到天)
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long incr = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + ":" + date);
// 3.拼接并返回
//return timeStamp << COUNT_BITS + incr;
return timeStamp << COUNT_BITS | incr; // 用或运算效率更高
}
}- 使用 Redis 达到全局唯一 ID 的策略:
- 每天一个 key,例如:
incr:order:2024:03:08
表示是 2024 年 3 月 8 日关于订单业务的 key,既防止 redis 自增值太大,又方便统计订单量(订单量即为 key 对应的 value)。 - ID 构造是:时间戳 + 计数器
- 每天一个 key,例如:
- 使用 Redis 达到全局唯一 ID 的策略:
2.3.2 优惠券秒杀下单
优惠券秒杀下单的流程:
下单时需要判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
根据流程图进行代码编写:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27package com.f.controller;
import com.f.dto.Result;
import com.f.service.IVoucherOrderService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* <p>
* 前端控制器
* </p>
*/
public class VoucherOrderController {
private IVoucherOrderService voucherOrderService;
public Result seckillVoucher( { Long voucherId)
return voucherOrderService.seckillVoucher(voucherId);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14package com.f.service;
import com.f.dto.Result;
import com.f.pojo.VoucherOrder;
import com.baomidou.mybatisplus.extension.service.IService;
/**
* <p>
* 服务类
* </p>
*/
public interface IVoucherOrderService extends IService<VoucherOrder> {
Result seckillVoucher(Long voucherId);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65package com.f.service.impl;
import com.f.dto.Result;
import com.f.pojo.SeckillVoucher;
import com.f.pojo.VoucherOrder;
import com.f.mapper.VoucherOrderMapper;
import com.f.service.ISeckillVoucherService;
import com.f.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.f.utils.RedisIdWorker;
import com.f.utils.UserHolder;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
/**
* <p>
* 服务实现类
* </p>
*/
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
private ISeckillVoucherService seckillVoucherService;
private RedisIdWorker redisIdWorker;
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始或结束
LocalDateTime now = LocalDateTime.now();
if (voucher.getBeginTime().isAfter(now)
|| voucher.getEndTime().isBefore(now)) {
// 3.如果秒杀还未开始或者已经结束,返回错误信息
return Result.fail("秒杀还未开始或已经结束");
}
// 4.如果秒杀开始,且没有结束,再判断库存是否充足
if (voucher.getStock() < 1) {
// 5.库存不足,返回错误信息
return Result.fail("库存不足");
}
// 6.库存充足,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId) // where id = ?
.update();
if (!success) {
return Result.fail("库存不足");
}
// 7.创建订单
VoucherOrder order = new VoucherOrder();
order.setId(redisIdWorker.nextId("order"));
order.setUserId(UserHolder.getUser().getId());
order.setVoucherId(voucherId);
save(order);
// 8.返回订单id
return Result.ok(order.getId());
}
}
2.3.3 ★超卖问题
在高并发场景下,上面的代码会出现超卖问题,即,虽然库存只有 100 张特价券,但却出现卖出超过 100 张特价券的情况。这是因为多线程在同时操作同一个资源。
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁,加锁分为两种:悲观锁和乐观锁:
- 悲观锁比较简单,直接添加同步锁,让线程串行执行,但性能不高。
我们重点来看乐观锁。
★乐观锁解决超卖问题
乐观锁的关键是在修改时,判断之前查询得到的数据是否有被修改过(也就是在 set 语句中添加 where 条件),常见的方式有两种:
版本号法:每个数据记录都有一个版本号,用于追踪数据的变化。在并发操作中,乐观锁机制通过比较版本号来判断数据是否已经被其他事务修改,从而协调事务的执行。
CAS法:
CAS(Compare And Swap)
,CAS 有 3 个操作数:- 需要读写的内存值
V
- 需要读写的内存值
进行比较的值
A
- 拟写入的新值
B
当且仅当预期值
A
和内存值V
相同时,将内存值V
修改为B
,否则什么都不做。CAS
法可以看作是版本号法的简化。
- 拟写入的新值
接下来用
CAS
法解决库存超卖问题,只需要修改前面代码中的第 6 步即可:1
2
3
4
5
6
7
8// 6.库存充足,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId).eq("stock", voucher.getStock()) // where id = ? and stock = ? 添加了乐观锁
.update();
if (!success) {
return Result.fail("库存不足");
}结果是不会出现线程安全问题,但是特价券会出现过剩的情况,即 100 张特价券并没有被卖完。这就是乐观所的弊端:例如多个线程一开始标识 stock 为 100,然后有个线程把 stock 减一了,其他那些线程就会返回错误,特价券就剩余下来了。
改进:不去在 sql 语句中判断库存是否改变,而是在 sql 语句中判断库存是否 >0:
1
2
3
4
5
6
7
8// 6.库存充足,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
.update();
if (!success) {
return Result.fail("库存不足");
}
乐观锁用于修改数据时。
2.3.4 一人一单
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单。
在之前的代码中增加判断用户是否已经买过该特价券的业务逻辑:
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
private ISeckillVoucherService seckillVoucherService;
private RedisIdWorker redisIdWorker;
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始或结束
LocalDateTime now = LocalDateTime.now();
if (voucher.getBeginTime().isAfter(now)
|| voucher.getEndTime().isBefore(now)) {
// 3.如果秒杀还未开始或者已经结束,返回错误信息
return Result.fail("秒杀还未开始或已经结束");
}
// 4.如果秒杀开始,且没有结束,再判断库存是否充足
if (voucher.getStock() < 1) {
// 5.库存不足,返回错误信息
return Result.fail("库存不足");
}
// 6.判断用户是否已经买过该特价券了
Long userId = UserHolder.getUser().getId();
long count = query().eq("user_id", userId).eq("voucher_id", voucherId)
.count();
// 7.如果用户已经买过该特价券,就返回错误信息
if (count > 0) {
return Result.fail("您已经购买过该特价券,每人仅限一张");
}
// 8.库存充足,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
.update();
if (!success) {
return Result.fail("库存不足");
}
// 9.创建订单
VoucherOrder order = new VoucherOrder();
order.setId(redisIdWorker.nextId("order"));
order.setUserId(userId);
order.setVoucherId(voucherId);
save(order);
// 10.返回订单id
return Result.ok(order.getId());
}
}- 用 jmeter 压测发现,用户还是可以购买多张特价券,原因在于:先查询,再判断。在多并发场景下,如果多个用户同时查询,那在判断的时候,就出现了多线程导致的错误。
既然是多线程错误,那加锁吧,但是注意:不能用乐观锁,因为乐观锁用于修改数据时,而这里是查询数据,所以我们要用悲观锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77package com.f.service.impl;
import com.f.dto.Result;
import com.f.pojo.SeckillVoucher;
import com.f.pojo.VoucherOrder;
import com.f.mapper.VoucherOrderMapper;
import com.f.service.ISeckillVoucherService;
import com.f.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.f.utils.RedisIdWorker;
import com.f.utils.UserHolder;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
/**
* <p>
* 服务实现类
* </p>
*/
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
private ISeckillVoucherService seckillVoucherService;
private RedisIdWorker redisIdWorker;
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始或结束
LocalDateTime now = LocalDateTime.now();
if (voucher.getBeginTime().isAfter(now)
|| voucher.getEndTime().isBefore(now)) {
// 3.如果秒杀还未开始或者已经结束,返回错误信息
return Result.fail("秒杀还未开始或已经结束");
}
// 4.如果秒杀开始,且没有结束,再判断库存是否充足
if (voucher.getStock() < 1) {
// 5.库存不足,返回错误信息
return Result.fail("库存不足");
}
return createVoucherOrder(voucherId);
}
public synchronized Result createVoucherOrder(Long voucherId) { // 使用悲观锁
// 6.判断用户是否已经买过该特价券了
Long userId = UserHolder.getUser().getId();
long count = query().eq("user_id", userId).eq("voucher_id", voucherId)
.count();
// 7.如果用户已经买过该特价券,就返回错误信息
if (count > 0) {
return Result.fail("您已经购买过该特价券,每人仅限一张");
}
// 8.库存充足,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
.update();
if (!success) {
return Result.fail("库存不足");
}
// 9.创建订单
VoucherOrder order = new VoucherOrder();
order.setId(redisIdWorker.nextId("order"));
order.setUserId(userId);
order.setVoucherId(voucherId);
save(order);
// 10.返回订单id
return Result.ok(order.getId());
}
}
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了,因为在集群模式下,我们会有多个 jvm 的锁监视器,就导致可能出现同一个用户的多个线程都能执行的情况。具体可以看视频
P55
。- 为了解决这个问题,我们需要使用分布式锁。
2.3.5 ★★★分布式锁
- 在集群模式下,
synchronized
锁失效了,**synchronized
只能保证单个 jvm 内部的多个线程之间互斥**,而没有办法让集群模式下的多个 jvm 进程之间互斥。
2.3.5.1 分布式锁-原理
为了实现多个 jvm 进程之间的互斥,我们不去使用 jvm 内部的锁监视器,而是在外部开一个锁监视器,让它监视所有的线程。
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁需要满足的特性:
- 多进程可见,互斥,高可用,高性能,安全性。
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:
- MySQL:MySQL 本身带有锁机制,但是由于 MySQL 性能一般,所以采用分布式锁的情况下,使用 MySQL 作为分布式锁比较少见。
- Redis:Redis 作为分布式锁比较常见,利用
setnx
方法,如果 Key 插入成功,则表示获取到锁,插入失败则表示无法获取到锁。 - Zookeeper:Zookeeper 也是企业级开发中比较好的一个实现分布式锁的方案。
MySQL Redis Zookeeper 互斥 利用 MySQL 本身的互斥锁机制 利用 setnx
互斥命令利用节点的唯一性和有序性 高可用 好 好 好 高性能 一般 好 一般 安全性 断开链接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开链接自动释放
2.3.5.2 分布式锁-Redis实现
我们在这里基于 Redis 来实现分布式锁:
实现分布式锁时需要实现的两个基本方法:
获取锁:
互斥:确保只能有一个线程获取锁。
1
2# 添加锁,利用setnx的互斥特性
SETNX lock thread1
释放锁:
手动释放:
1
2# 释放锁,删除即可
DEL lock超时释放:获取锁时添加一个超时时间。
1
2# 添加锁过期时间避免服务宕机引起的死锁
EXPIRE lock 10
思考:如何做到添加锁操作和超时释放锁操作必须同时成功或者同时失败?
避免刚添加完锁后,服务器宕机,不能进行超时释放锁的操作。
SET
操作和EXPIRE
操作写在同个语句即可:SET lock thread1 EX 10 NX
(NX 表示 key 不存在的时候才可以 SET)。
接下来基于 Redis 实现分布式锁的初级版本:
定义分布式锁的接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20package com.f.utils;
/**
* @author fzy
* @date 2024/3/9 9:41
*/
public interface ILock {
/**
* 获取锁
*
* @param timeoutSec 锁的超时时间,过期后自动释放
* @return true代表获取锁成功,false代表失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}实现接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package com.f.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.time.Duration;
/**
* @author fzy
* @date 2024/3/9 9:44
*/
public class SimpleRedisLock implements ILock {
private String key;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
public SimpleRedisLock(String key, StringRedisTemplate stringRedisTemplate) {
this.key = key;
this.stringRedisTemplate = stringRedisTemplate;
}
public boolean tryLock(long timeoutSec) {
// 获取当前线程标识
Long threadId = Thread.currentThread().getId();
return Boolean.TRUE.equals(stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + key, threadId.toString(), Duration.ofSeconds(timeoutSec)));
}
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + key);
}
}在一人一单业务中使用分布式锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93package com.f.service.impl;
import com.f.dto.Result;
import com.f.pojo.SeckillVoucher;
import com.f.pojo.VoucherOrder;
import com.f.mapper.VoucherOrderMapper;
import com.f.service.ISeckillVoucherService;
import com.f.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.f.utils.RedisIdWorker;
import com.f.utils.SimpleRedisLock;
import com.f.utils.UserHolder;
import jakarta.annotation.Resource;
import org.springframework.aop.framework.AopContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
/**
* <p>
* 服务实现类
* </p>
*/
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
private ISeckillVoucherService seckillVoucherService;
private RedisIdWorker redisIdWorker;
private StringRedisTemplate stringRedisTemplate;
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始或结束
LocalDateTime now = LocalDateTime.now();
if (voucher.getBeginTime().isAfter(now)
|| voucher.getEndTime().isBefore(now)) {
// 3.如果秒杀还未开始或者已经结束,返回错误信息
return Result.fail("秒杀还未开始或已经结束");
}
// 4.如果秒杀开始,且没有结束,再判断库存是否充足
if (voucher.getStock() < 1) {
// 5.库存不足,返回错误信息
return Result.fail("库存不足");
}
// 使用分布式锁(对同一个用户的id上锁)
SimpleRedisLock lock = new SimpleRedisLock("order:" + UserHolder.getUser().getId(), stringRedisTemplate);
// 获取锁
boolean isLock = lock.tryLock(10L);
if (!isLock) { // 获取锁失败
return Result.fail("您已经购买过该特价券,每人仅限一张");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); // 代理对象才能实现事务
return proxy.createVoucherOrder(voucherId);
} finally {
lock.unlock(); // 释放锁
}
}
public Result createVoucherOrder(Long voucherId) {
// 6.判断用户是否已经买过该特价券了
Long userId = UserHolder.getUser().getId();
long count = query().eq("user_id", userId).eq("voucher_id", voucherId)
.count();
// 7.如果用户已经买过该特价券,就返回错误信息
if (count > 0) {
return Result.fail("您已经购买过该特价券,每人仅限一张");
}
// 8.库存充足,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
.update();
if (!success) {
return Result.fail("库存不足");
}
// 9.创建订单
VoucherOrder order = new VoucherOrder();
order.setId(redisIdWorker.nextId("order"));
order.setUserId(userId);
order.setVoucherId(voucherId);
save(order);
// 10.返回订单id
return Result.ok(order.getId());
}
}
2.3.5.3 Redis分布式锁误删问题
考虑这样一种极端情况:
- 线程 1 首先获取分布式锁,然后执行自己的业务,但是线程 1 的业务阻塞了,超过了分布式锁的 TTL,所以分布式锁被超时释放了,此时线程 1 还在执行业务。
- 线程 2 来获取分布式锁,因为线程 1 的分布式锁被超时释放,所以线程 2 可以获取分布式锁,然后执行自己的业务。
- 此时问题来了,在线程 2 执行自己的业务时,线程 1 的业务完成,它开始手动释放锁,因为线程 1 和线程 2 共用一把锁,这就导致,线程 1 将线程 2 的分布式锁释放了。
- 线程 3 再来获取分布式锁,因为分布式锁被线程 1 释放了(按理来说应该是由线程 2 来释放的),所以线程 3 可以获取分布式锁,然后执行自己的业务。
这就造成了分布式锁误删的问题。
为了解决这一问题,我们需要在释放锁的时候判断锁的标识是否一致,Redis 锁的标识一般是指 value 的区分,前面在获取锁时,我们是将线程标识,即线程 id 作为 value 存入,因此在释放锁时,只要判断当前线程的 id 是否为当前锁的 value 即可。
- 这里的线程标识,我们之前用的是线程 id 进行标识,但是如果放到集群线程下,多个 jvm 可能会出现同个线程 id 的线程,这样会引发线程安全问题,所以这里要用
ThreadID + UUID
,用UUID
来区分不同的 jvm,用ThreadID
来区分不同的线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.f.utils;
import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.time.Duration;
/**
* @author fzy
* @date 2024/3/9 9:44
*/
public class SimpleRedisLock implements ILock {
private String key;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
private static final String UUID_PREFIX = UUID.randomUUID().toString(true);
public SimpleRedisLock(String key, StringRedisTemplate stringRedisTemplate) {
this.key = key;
this.stringRedisTemplate = stringRedisTemplate;
}
public boolean tryLock(long timeoutSec) {
// 获取当前线程标识
String threadId = UUID_PREFIX + "-" + Thread.currentThread().getId();
return Boolean.TRUE.equals(stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + key, threadId, Duration.ofSeconds(timeoutSec)));
}
public void unlock() {
// 获取线程标识
String threadId = UUID_PREFIX + "-" + Thread.currentThread().getId();
// 获取锁中标识
String value = stringRedisTemplate.opsForValue().get(KEY_PREFIX + key);
// 判断线程标识和锁中标识是否一致
if (threadId.equals(value)) {
// 一致则释放锁
stringRedisTemplate.delete(KEY_PREFIX + key);
}
}
}- 这里的线程标识,我们之前用的是线程 id 进行标识,但是如果放到集群线程下,多个 jvm 可能会出现同个线程 id 的线程,这样会引发线程安全问题,所以这里要用
2.3.5.4 分布式锁的原子性问题
我们考虑一种比较极端的情况:
在
SimpleRedisLock
的unlock
方法中,“判断线程标识和锁中标识是否一致” 和 “释放锁” 是两步操作,在这两步操作中,如果发生了阻塞,就有可能会出现并发问题:- 线程 1 首先获取分布式锁,然后执行自己的业务,线程 1 执行完自己的业务,并判断线程标识和锁中标识一致,就在线程 1 要释放分布式锁时,发生了阻塞(例如 jvm 的垃圾回收机制会阻塞所有代码)。
- 线程 1 一直阻塞直到超时释放锁,此时线程 2 来获取分布式锁,因为线程 1 的分布式锁被超时释放,所以线程 2 可以获取分布式锁,然后执行自己的业务。
- 此时问题来了,在线程 2 执行自己的业务时,线程 1 的阻塞结束,因为线程 1 在之前已经判断线程标识和锁中标识一致,所以线程 1 接下来直接执行释放锁的操作,于是线程 1 将线程 2 的分布式锁释放了。
- 线程 3 再来获取分布式锁,因为分布式锁被线程 1 释放了(按理来说应该是由线程 2 来释放的),所以线程 3 可以获取分布式锁,然后执行自己的业务。
同样出现了分布式锁被误删的问题,这里的原因在于:“判断线程标识和锁中标识是否一致” 和 “释放锁” 是两步操作,存在原子性问题。
Lua脚本
Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。Lua 是一种编
程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.htmlLua 语言调用 Redis:
redis.call('命令名称', 'key', '其他参数', ...)
- 写好脚本以后,使用 Redis 命令来调用脚本:
EVAL script numkeys key [key ...] arg [arg ...]
- 写好脚本以后,使用 Redis 命令来调用脚本:
在介绍了 Lua 脚本后,再来看看之前分布式锁的原子性问题:
目前
unlock
的业务流程:- 获取锁中的线程标识
- 判断是否与指定的标识(当前线程标识)一致
- 如果一致则释放锁(删除)
- 如果不一致则什么都不做
用 Lua 脚本:
1
2
3
4
5
6
7
8
9
10
11-- 锁的key
-- local key = KEYS[1]
-- 当前线程标识
-- local threadId = ARGV[1]
-- 获取锁中的线程标识
local id = redis.call('get',KEYS[1])
-- 比较
if(id == ARGV[1]) then
return redis.call('del',KEYS[1])
end
return 0
Java调用Lua脚本
1 | package com.f.utils; |
- 总结:
- 基于 Redis 的分布式锁的实现思路:
- 利用
set nx ex
获取锁,并设置过期时间,保存线程标识; - 释放锁时先判断线程标识是否与自己一致,一致则删除锁,并使用 Lua 脚本保证原子性操作
- 利用
- 特性:
- 利用
set nx
满足互斥性 - 利用
set ex
保证故障时锁依然能释放,避免死锁,提高安全性 - 利用 Redis 集群保证高可用和高并发特性
- 利用
- 基于 Redis 的分布式锁的实现思路:
2.3.6 Redisson
- 基于
setnx
实现的分布式锁存在下面的问题:- 不可重入:同一个线程无法多次获取同一把锁。
- 不可重试:获取锁只尝试一次就返回 false,没有重试机制。
- 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患。
- 主从一致性:如果 Redis 提供了主从集群,由于主从同步存在延迟,当主宕机时,如果从还未同步主中的锁数据,则会出现主从一致性问题。
- Redisson 可以帮助我们,Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid),它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
2.3.6.1 Redisson入门
使用 Redisson 的步骤:
导入 Redisson 依赖:
1
2
3
4
5
6<!--redisson依赖-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>配置 Redisson 客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24package com.f.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author fzy
* @date 2024/3/12 21:47
*/
public class RedissonConfig {
public RedissonClient redissonClient() {
// 配置类
Config config = new Config();
// 添加Redis地址
config.useSingleServer().setAddress("redis://192.168.44.132:6379").setPassword("123456");
// 返回RedissonClient对象
return Redisson.create(config);
}
}使用 Redisson 的分布式锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private RedissonClient redissonClient;
public Result seckillVoucher(Long voucherId) {
......
// 使用Redisson的分布式锁(对同一个用户的id上锁)
RLock lock = redissonClient.getLock("lock:order:" + UserHolder.getUser().getId());
// 获取锁
boolean isLock = lock.tryLock();
if (!isLock) { // 获取锁失败
return Result.fail("您已经购买过该特价券,每人仅限一张");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); // 代理对象才能实现事务
return proxy.createVoucherOrder(voucherId);
} finally {
lock.unlock(); // 释放锁
}
}
2.3.6.2 Redisson可重入锁原理
为什么要引入可重入锁这种机制?
- 假如有一个线程 1 获得了对象 A 的锁,那么该线程 1 如果在未释放锁前再次请求该对象的锁时,如果没有可重入锁的机制,线程 1 是不会再次获取到锁的,这样的话就会出现死锁的情况。
- 可重入锁的概念就是:自己可以获取自己的锁。
为了实现可重入锁,就要在锁已经存在的情况下进行判断,判断当前锁是否是自己的锁,如果是自己的锁,就可以再次获取,并将获取次数加 1,如果不是自己的锁,就获取锁失败。
因为要另外记录重入次数,所以不能再用 String 类型的数据结构,而是要使用 Hash 类型的数据结构:
- 第一次获取锁时,value 值为 1,然后每重入一次,value 值加一。
释放锁时,不是直接删除锁,而是将锁的 value 值减一,直到 value 为 0 的时候才可以删除锁。
因为 Hash 类型的 Redis 命令没有
set ex nx
这种命令,所以我们只能先判断是否存在(exist),再设置过期时间。为了保证原子性操作,我们要将上图中锁的业务逻辑写在 Lua 脚本里。Redisson 的可重入锁就利用了上面类似的原理。
2.3.6.3 Redisson锁重试和WatchDog机制
Redisson 分布式锁原理:
- 可重入:利用 Hash 结构记录线程标识和重入次数。
- 可重试:利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制。
- 超时续约:利用 WatchDog 机制,每隔一段时间(leaseTime / 3),重置超时时间。
剩下还有一个主从一致性问题。
2.3.6.4 Redisson的multiLock原理
既然使用 Redis 的主从集群会出现主从一致性问题,那就干脆不使用主从集群,而是将每个 Redis 节点都作为主节点。
- 在获取锁时,应用会向每个 Redis 主节点都获取锁,即向每个 Redis 主节点都进行
set nx ex
操作,只有向每个 Redis 主节点进行的set nx ex
操作都成功了,应用才能真正获取锁。- 为了 Redis 集群的高性能,可以在每个 Redis 主节点后面再挂上从节点。
- 此时,就算其中一个 Redis 主节点宕机了,相应的从节点中没有保存锁数据,也不会出现并发问题:
- 就算别的应用能对该 “新的主节点” 进行
set nx ex
操作来获取锁(因为没有保存锁数据),但是该应用不能对其他的 Redis 主节点进行获取锁的操作,而我们说过,只有向每个 Redis 主节点进行的set nx ex
操作都成功了,应用才能真正获取锁,所以该应用并不能获取锁。
- 就算别的应用能对该 “新的主节点” 进行
这样,就不会因为主从一致性,而导致并发问题。
- 这种将多个锁联合在一起,成为一个联锁的方案,就是
multiLock
。
- 在获取锁时,应用会向每个 Redis 主节点都获取锁,即向每个 Redis 主节点都进行
2.3.6.5 ★总结
- 分布式锁总结:
- 不可重入Redis分布式锁:
- 原理:利用
setnx
的互斥性;利用ex
避免死锁;释放锁时判断线程标识 - 缺陷:不可重入、无法重试、锁超时失效
- 原理:利用
- 可重入Redis分布式锁:
- 原理:利用 Hash 结构,记录线程标识和重入次数;利用信号量控制锁重试等待;利用 WatchDog 延续锁时间
- 解决了不可重入、无法重试、锁超时失效的问题
- 缺陷:Redis 主节点宕机引起锁失效问题
- 原理:利用 Hash 结构,记录线程标识和重入次数;利用信号量控制锁重试等待;利用 WatchDog 延续锁时间
- **
multiLock
**:- 原理:多个独立的 Redis 节点,必须在所有节点都获取重入锁,才算获取锁成功
- 缺陷:运维成本高、实现复杂
- 不可重入Redis分布式锁:
2.3.7 Redis秒杀优化
由于目前的优惠券秒杀业务包含以下几步:
- 查询优惠券,判断秒杀库存
- 查询订单,校验一人一单(用到分布式锁,保证一人一单)
- 减库存(用到乐观锁,防止超卖)
- 创建订单
这几步都涉及到和数据库的操作,所以业务的性能并不高,业务耗时较长。
为了提高业务性能,考虑对业务流程进行优化:
判断秒杀库存和校验一人一单的步骤完成后,其实就可以返回结果给客户,而不需要等到减库存和创建订单的步骤完成,特别是减库存和创建订单涉及数据库的写操作,耗时会比较久。
- 可以在判断秒杀库存和校验一人一单的步骤完成后,就将优惠券id、用户id、订单id的相关信息保存到阻塞队列,然后就返回结果给客户。而且通过 Redis 进行这两步来提高性能。
- 接着由另外的线程异步读取阻塞队列中的信息,完成减库存和创建订单的操作。
通过异步执行优惠券秒杀业务,来提高业务性能。
用 String 类型来存储库存,用 Set 类型来存储用户 id(保证唯一性,即保证一人一单)。
改进秒杀业务,提高并发性能。
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中
- 基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券 id 和用户 id 封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
2.3.7.1 基于Redis完成秒杀资格的判断
新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到redis中
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:'..voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:'..voucherId
-- 3.脚本业务
-- 3.1判断库存是否充足
if (tonumber(redis.call('get', stockKey)) <= 0)
then return 1
end
-- 3.2判断用户是否下单 sismember orderKey userId
if(redis.call('sismember',orderKey,userId)==1)
then return 2
end
-- 3.3扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.4下单 sadd orderKey userId
redis.call('sadd',orderKey,userId)
return 0
2.3.7.2 基于阻塞队列实现异步秒杀下单
如果抢购成功,将下单信息封装后存入阻塞队列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Result seckillVoucher(Long voucherId) {
// 1.执行lua脚本
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString());
// 2.判断结果是否为0
// 3.不为0则返回错误信息
if (!result.equals(0L)) {
return Result.fail(result.equals(1L) ? "库存不足" : "您已经购买过该特价券,每人仅限一张");
}
// 4.为0则将下单信息保存到阻塞队列中
VoucherOrder order = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
order.setId(orderId);
order.setUserId(userId);
order.setVoucherId(voucherId);
orderTasks.add(order);
// 5.返回订单id
return Result.ok(orderId);
}开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25private class VoucherOrderHandler implements Runnable {
public void run() {
while (true) {
try {
// 1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
// 2.减少库存,创建订单
// 2.1库存充足,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1") // set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
.update();
if (!success) {
log.error("库存不足");
return;
}
// 2.2创建订单
save(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}最终优化后的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81/**
* <p>
* 服务实现类
* </p>
*/
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
private ISeckillVoucherService seckillVoucherService;
private RedisIdWorker redisIdWorker;
private StringRedisTemplate stringRedisTemplate;
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); // 创建阻塞队列
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
public void run() {
while (true) {
try {
// 1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
// 2.减少库存,创建订单
// 2.1库存充足,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1") // set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
.update();
if (!success) {
log.error("库存不足");
return;
}
// 2.2创建订单
save(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}
public Result seckillVoucher(Long voucherId) {
// 1.执行lua脚本
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString());
// 2.判断结果是否为0
// 3.不为0则返回错误信息
if (!result.equals(0L)) {
return Result.fail(result.equals(1L) ? "库存不足" : "您已经购买过该特价券,每人仅限一张");
}
// 4.为0则将下单信息保存到阻塞队列中
VoucherOrder order = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
order.setId(orderId);
order.setUserId(userId);
order.setVoucherId(voucherId);
orderTasks.add(order);
// 5.返回订单id
return Result.ok(orderId);
}
}
2.3.8 Redis消息队列实现异步秒杀
上一节我们基于 jvm 的阻塞队列进行秒杀优化存在 2 个问题:
- jvm 的内存限制问题。
- 数据安全问题:jvm 的内存数据没有持久化,每当服务器重启或者宕机或者从阻塞队列取的时候遇到异常,数据都会丢失。
解决办法: 消息队列。
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括 3 个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
视频中后面讲的是 Redis 自己提供的消息队列,而不是用市面上主流的消息队列,所以就不记笔记了。
2.4 关注推送(Feed流)
关注推送也叫做 Feed 流,直译为投喂。为用户持续的提供 “沉浸式” 的体验,通过无限下拉刷新获取新的信息。
Feed 流产品有两种常见模式:
- Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注,例如朋友圈。
- 优点:信息全面,不会有缺失。并且实现也相对简单
- 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
- 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容,推送用户感兴趣信息来吸引用户。
- 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
- 缺点:如果算法不精准,可能起到反作用
本例中的个人页面,是基于关注的好友来做 Feed 流,因此采用 Timeline 的模式。该模式的实现方案有三种:
- 拉模式(读扩散)
推模式(写扩散)
- 推拉结合(读写混合)
| | 拉模式 | 推模式 | 推拉结合 |
| :———-: | :——: | :—————: | :——————-: |
| 写比例 | 低 | 高 | 中 |
| 读比例 | 高 | 低 | 中 |
| 用户读取延迟 | 高 | 低 | 低 |
| 实现难度 | 复杂 | 简单 | 很复杂 |
| 使用场景 | 很少使用 | 用户量少,没有大V | 过千万的用户量,有大V |- 这三种模式的具体动画表示可以看视频
P84
本例采用实现简单的推模式,通过将用户发布的文章推送到粉丝的收件箱来实现关注推送。
2.4.1 基于推模式实现关注推送
需求:
修改新增探店笔记的业务,在保存 blog 到数据库的同时,推送到粉丝的收件箱
收件箱满足可以根据时间戳排序,必须用 Redis 的数据结构实现
- 根据目前所学,
List
或者SortedSet
类型的数据结构都能满足要求。
- 根据目前所学,
查询收件箱数据时,可以实现分页查询
Feed 流的分页问题:Feed 流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。
需要使用滚动分页,通过记录上一次读取的页数的
lastId
来避免重复读取的情况。- 因为要实现分页查询的滚动分页,所以
List
类型的数据结构就不能满足要求了,因为它只能根据角标来分页,而在 Feed 流中角标是会变化的。由于SortedSet
类型的数据结构有ZRANGEBYSCORE key min max
命令,所以就可以满足我们的要求。
- 因为要实现分页查询的滚动分页,所以
修改新增探店笔记的业务,在保存 blog 到数据库的同时,推送到粉丝的收件箱:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Result saveBlog(Blog blog) {
// 1.获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 2.保存探店笔记
boolean isSuccess = save(blog);
if (!isSuccess) {
return Result.fail("保存笔记失败");
}
// 3.查找所有关注了作者的粉丝
List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
// 4.将笔记的id推送到粉丝的收件箱
for (Follow follow : follows) {
Long userId = follow.getUserId(); // 粉丝id
stringRedisTemplate.opsForZSet().add(RedisConstants.FEED_KEY + userId,
blog.getId().toString(),
System.currentTimeMillis());
}
// 5.返回笔记id
return Result.ok(blog.getId());
}
滚动分页
查询收件箱数据时,可以实现分页查询:
滚动分页查询主要使用
SortedSet
的ZRANGEBYSCORE
方法,因为以时间戳作为 score,越新的笔记排名理应越靠前,所以要降序排序,所以应该使用ZREVRANGEBYSCORE
方法,主要涉及四个参数:ZREVRANGEBYSCORE key max min [WITHSCORES][LIMIT offset count]
max
:指定范围对应的最大 score- 第一次查询给当前时间戳,往后查询给 “上一次查询的
min
”
- 第一次查询给当前时间戳,往后查询给 “上一次查询的
min
:指定范围对应的最小 score- 因为 score 为时间戳,所以恒为正数,那么把
min
固定为 0 即可
- 因为 score 为时间戳,所以恒为正数,那么把
offset
:读取偏移量,为 0 表示从max
所在的数据开始向下读count
条数据- 第一次查询给 0,往后查询给 “上一次查询的
min
对应的数据个数”,避免好几条数据因为 score 值相同而出现问题,例如下面这样,m6
被重复查询了。
- 第一次查询给 0,往后查询给 “上一次查询的
count
:读取数据的条数- 由前端传值,来决定一页显示几条数据
1
2
3
4
5
6
7
8
9
10
11
12package com.f.dto;
import lombok.Data;
import java.util.List;
public class ScrollResult {
private List<?> list;
private Long minTime; // 查询到的min,作为下一次查询的max
private Integer offset; // 读取偏移量
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public Result queryBlogOfFollow(Long max, Integer offset) {
// 1.获取当前用户
Long userId = UserHolder.getUser().getId();
// 2.根据当前用户找到对应的收件箱,并根据max和offset找到收件箱对应的片段
String key = RedisConstants.FEED_KEY + userId;
Set<ZSetOperations.TypedTuple<String>> tuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 3);
if (tuples == null || tuples.isEmpty()) { // 非空判断
return Result.ok();
}
// 3.解析数据:blogId、minTime、offset
List<String> blogIds = new ArrayList<>();
long minTime = 0;
int count = 1;
for (ZSetOperations.TypedTuple<String> tuple : tuples) {
// 获取blogId
String blogId = tuple.getValue();
blogIds.add(blogId);
// 获取时间戳和offset
long time = tuple.getScore().longValue();
if (minTime == time) {
count++;
} else {
minTime = time;
count = 1;
}
}
// 4.根据blogId查询对应的博客并封装
List<Blog> blogs = new ArrayList<>();
for (String blogId : blogIds) {
Blog blog = query().eq("id", blogId).one();
queryBlogUser(blog);
blogs.add(blog);
}
// 5.返回结果
ScrollResult scrollResult = new ScrollResult();
scrollResult.setList(blogs);
scrollResult.setMinTime(minTime);
scrollResult.setOffset(count);
return Result.ok(scrollResult);
}
2.5 UV统计
UV
:全称Unique Visitor
,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1 天内同一个用户多次访问该网站,只记录 1 次。- 相应的还有一个
PV
概念:全称Page View
,也叫页面访问量或点击量,用户每访问网站的一个页面,记录 1 次 PV,用户多次打开页面,则记录多次 PV,PV 往往用来衡量网站的流量。
UV 统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存,但是如果每个访问的用户都保存到 Redis 中,数据量会非常恐怖。所以我们采用
HyperLogLog
来进行统计。- 相应的还有一个
2.5.1 HyperLogLog
Hyperloglog(HLL)
是从Loglog
算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。Redis 中的 HLL 是基于 String 结构实现的,单个 HLL 的内存永远小于 16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于 0.81% 的误差,不过对于 UV 统计来说,这完全可以忽略。
下面是 Redis 中 HyperLogLog 的相关命令:
2.5.2 使用HyperLogLog实现UV统计
1 |
|