0%

Redis实战篇

  • 以一个黑马点评项目作为 Redis 的实战篇内容
    • 黑马点评半成品代码地址:https://gitcode.net/Drifter_Galaxy/hmdp.git
      • 其中的 hm-dianping.zip(后端)、nginx-1.18.0.zip(里面有前端资源)、hmdp.sql 文件是我们需要的。
      • 具体如何导入该项目,见视频 P25

二、实战篇

  • 以一个黑马点评项目作为 Redis 的实战篇内容:

    • 黑马点评半成品代码地址:https://gitcode.net/Drifter_Galaxy/hmdp.git
      • 其中的 hm-dianping.zip(后端)、nginx-1.18.0.zip(里面有前端资源)、hmdp.sql 文件是我们需要的。
      • 具体如何导入该项目,见视频 P25

2.1 短信登录

  • 短信登录的总流程:

2.1.1 发送短信验证码

  • 发送短信验证码的流程:

    • /api 的作用:
      • 在 Web 开发中,使用路径中包含 /api 的约定是一种常见的做法,通常用于区分前端页面的路径和后端 API 的路径。
  • 根据短信登录总流程的第一个图示进行业务代码的编写(从接到前端请求开始往后端写):

    1
    2
    3
    4
    5
    6
    7
    8
    /**
    * 发送手机验证码
    */
    @PostMapping("/code")
    public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
    // 发送短信验证码并保存验证码
    return userService.sendCode(phone, session);
    }
    1
    2
    3
    public interface IUserService extends IService<User> {
    Result sendCode(String phone, HttpSession session);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
    @Override
    public Result sendCode(String phone, HttpSession session) {
    // 1.校验手机号
    if (RegexUtils.isPhoneInvalid(phone)) {
    // 手机号不符合,返回错误信息
    return Result.fail("手机号有误,请重试");
    }
    // 2.生成验证码
    String code = RandomUtil.randomNumbers(6);
    // 3.将手机号验证码保存到session
    session.setAttribute("phone", phone);
    session.setAttribute("code", code);
    log.info("手机号验证码为:{}", code);
    // 4.发送验证码
    return Result.ok();
    }
    }

2.1.2 短信验证码登录

  • 短信验证码登录的流程:

  • 根据短信登录总流程的第二个图示进行业务代码的编写:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * 登录功能
    *
    * @param loginForm 登录参数,包含手机号、验证码;或者手机号、密码
    */
    @PostMapping("/login")
    public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session) {
    // 实现登录功能
    return userService.login(loginForm, session);
    }
    1
    2
    3
    4
    public interface IUserService extends IService<User> {
    ...
    Result login(LoginFormDTO loginForm, HttpSession session);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
    // 1.校验手机号,防止用户在得到短信验证码后修改手机号
    String phone = loginForm.getPhone();
    Object cachePhone = session.getAttribute("phone");
    if (cachePhone == null) {
    return Result.fail("手机号过期,请重试");
    }
    if (!phone.equals(cachePhone.toString())) {
    // 前后手机号不一致
    return Result.fail("获取验证码的手机号和当前手机号不一致");
    }
    // 2.校验验证码
    String code = loginForm.getCode();
    Object cacheCode = session.getAttribute("code");
    if (cacheCode == null) {
    return Result.fail("验证码过期,请重试");
    }
    if (!code.equals(cacheCode.toString())) {
    // 验证码有误
    return Result.fail("验证码有误,请重试");
    }
    // 3.根据手机号查询用户
    User user = query().eq("phone", cachePhone.toString()).one();
    if (user == null) {
    // 用户不存在则保存新用户数据到数据库中
    user = createUserWithPhone(cachePhone.toString());
    save(user);
    }
    // 4.保存用户信息到session中
    session.setAttribute("user", user);
    return Result.ok();
    }

    private User createUserWithPhone(String phone) {
    User user = new User();
    user.setPhone(phone);
    user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomNumbers(10));
    return user;
    }
    }

2.1.3 校验登录状态 - 拦截器

  • 因为用户需要先登录,才能对点评 app 的大多数功能进行操作,所以需要一个登录校验拦截器来帮助我们校验用户的登录状态,这样就不需要在每个 Controller 中都校验用户的登录状态了。另外,XxxController 在实现功能操作时有可能需要用户的信息,所以我们要把用户的信息保存在 ThreadLocal 中。

  • 根据短信登录总流程的第三个图示进行业务代码的编写:

    • 登录校验拦截器的实现:

      • 创建登录校验拦截器:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        package com.f.utils;

        import com.f.dto.UserDTO;
        import jakarta.servlet.http.HttpServletRequest;
        import jakarta.servlet.http.HttpServletResponse;
        import jakarta.servlet.http.HttpSession;
        import org.springframework.web.servlet.HandlerInterceptor;

        /**
        * @author fzy
        * @date 2024/3/6 21:18
        */
        public class LoginInterceptor implements HandlerInterceptor {
        @Override
        public boolean preHandle(HttpServletRequest request,
        HttpServletResponse response,
        Object handler) throws Exception {
        // 1.获取session
        HttpSession session = request.getSession();
        // 2.获取session中的用户
        UserDTO user = (UserDTO) session.getAttribute("user");
        // 3.判断用户是否存在
        if (user == null) {
        // 如果用户不存在,则拦截,返回 401 状态码
        response.setStatus(401);
        return false;
        }
        // 如果用户存在,就将用户保存在ThreadLocal中
        UserHolder.saveUser(user);
        return true;
        }

        @Override
        public void afterCompletion(HttpServletRequest request,
        HttpServletResponse response,
        Object handler, Exception ex) throws Exception {
        // 移除用户,避免内存泄漏
        UserHolder.removeUser();
        }
        }
        • 注意:在将用户信息保存在 ThreadLocal 中时,并没有将 User 对象保存在 ThreadLocal 中,而是将 UserDTO 保存在 ThreadLocal 中(UserDTO 中仅有 User 的部分属性),目的是为了隐藏用户的敏感信息。
      • 注册拦截器:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        package com.f.config;

        import com.f.utils.LoginInterceptor;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
        import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

        /**
        * springmvc相关的配置
        * @author fzy
        * @date 2024/3/6 21:25
        */
        @Configuration
        public class MvcConfig implements WebMvcConfigurer {
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor())
        .excludePathPatterns(
        // 指定以下这些路径不拦截
        "/user/code",
        "/user/login",
        "/blog/hot",
        "/shop/**",
        "/shop-type/**",
        "/upload/**",
        "/voucher/**"
        );
        }
        }
    1
    2
    3
    4
    5
    6
    @GetMapping("/me")
    public Result me() {
    // 获取当前登录的用户并返回
    UserDTO user = UserHolder.getUser();
    return Result.ok(user);
    }

2.1.4 服务器集群的session共享问题

  • session 共享问题:多台 Tomcat 并不共享 session 存储空间,当请求切换到不同 tomcat 服务时导致数据丢失的问题
★基于Redis实现共享session登录
  • 既然要使用 Redis 来存储用户的信息,那就要考虑用什么样的数据结构来存储:

    • 对于验证码,考虑使用 String 类型,并且 key 是手机号,value 是验证码。
    • 对于用户信息,考虑使用 Hash 类型,并且 key 是随机 token(不使用手机号作为 key 是为了安全,因为我们要将用户信息的 key 保存在浏览器内存中)。
  • 接下来就基于 2.1.1 - 2.1.3 小节的代码,进行修改。

    • 发送短信验证码:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
      @Resource
      private StringRedisTemplate stringRedisTemplate;

      @Override
      public Result sendCode(String phone, HttpSession session) {
      // 1.校验手机号
      if (RegexUtils.isPhoneInvalid(phone)) {
      // 手机号不符合,返回错误信息
      return Result.fail("手机号有误,请重试");
      }
      // 2.生成验证码
      String code = RandomUtil.randomNumbers(6);
      // 3.将手机号验证码保存到redis,key是手机号,value是验证码,并设置有效期为两分钟
      stringRedisTemplate.opsForValue().set(RedisConstants.LOGIN_CODE_KEY + phone, code, Duration.ofMinutes(RedisConstants.LOGIN_CODE_TTL));
      log.info("手机号验证码为:{}", code);
      // 4.发送验证码
      return Result.ok();
      }
      ......
      }
    • 短信验证码登录:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
      @Resource
      private StringRedisTemplate stringRedisTemplate;
      ......

      @Override
      public Result login(LoginFormDTO loginForm, HttpSession session) {
      // 1.校验验证码
      String phone = loginForm.getPhone();
      String code = loginForm.getCode();
      String cacheCode = stringRedisTemplate.opsForValue().get(RedisConstants.LOGIN_CODE_KEY + phone);
      if (cacheCode == null) {
      // 验证码过期
      return Result.fail("验证码过期,请重试");
      }
      if (!code.equals(cacheCode)) {
      // 验证码有误
      return Result.fail("验证码有误,请重试");
      }
      // 2.根据手机号查询用户
      User user = query().eq("phone", phone).one();
      if (user == null) {
      // 用户不存在则保存新用户数据到数据库中
      user = createUserWithPhone(phone);
      save(user);
      }
      // 3.保存用户信息到redis中
      // 3.1 随机生成token,作为登录令牌
      String token = UUID.randomUUID().toString(true);
      // 3.2 将用户信息用Hash类型存储在redis中,key是token,value是用户信息,并设置有效期为30分钟
      UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
      stringRedisTemplate.opsForHash().putAll(RedisConstants.LOGIN_USER_KEY + token, BeanUtil.beanToMap(userDTO, new HashMap<>(),
      CopyOptions.create().setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())));
      stringRedisTemplate.expire(RedisConstants.LOGIN_USER_KEY + token, Duration.ofMinutes(RedisConstants.LOGIN_USER_TTL));
      // 4.返回token给前端
      return Result.ok(token);
      }

      private User createUserWithPhone(String phone) {
      User user = new User();
      user.setPhone(phone);
      user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomNumbers(10));
      return user;
      }
      }
    • 校验登录状态:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      package com.f.utils;

      import cn.hutool.core.bean.BeanUtil;
      import com.f.dto.UserDTO;
      import jakarta.servlet.http.HttpServletRequest;
      import jakarta.servlet.http.HttpServletResponse;
      import org.springframework.data.redis.core.StringRedisTemplate;
      import org.springframework.web.servlet.HandlerInterceptor;

      import java.time.Duration;
      import java.util.Map;

      /**
      * @author fzy
      * @date 2024/3/6 21:18
      */
      public class LoginInterceptor implements HandlerInterceptor {
      private StringRedisTemplate stringRedisTemplate;

      public LoginInterceptor(StringRedisTemplate stringRedisTemplate) {
      this.stringRedisTemplate = stringRedisTemplate;
      }

      @Override
      public boolean preHandle(HttpServletRequest request,
      HttpServletResponse response,
      Object handler) throws Exception {
      // 1.获取请求头中的token
      String token = request.getHeader("authorization");
      if (token == null) {
      // 如果token不存在,则拦截,返回 401 状态码
      response.setStatus(401);
      return false;
      }
      // 2.根据token获取redis中的用户
      Map userMap = stringRedisTemplate.opsForHash().entries(RedisConstants.LOGIN_USER_KEY + token);
      // 3.判断用户是否存在
      if (userMap.isEmpty()) {
      // 如果用户不存在,则拦截,返回 401 状态码
      response.setStatus(401);
      return false;
      }
      // 如果用户存在,就刷新token的有效期,并将用户保存在ThreadLocal中
      UserDTO user = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
      UserHolder.saveUser(user);
      stringRedisTemplate.expire(RedisConstants.LOGIN_USER_KEY + token, Duration.ofMinutes(RedisConstants.LOGIN_USER_TTL));
      return true;
      }

      @Override
      public void afterCompletion(HttpServletRequest request,
      HttpServletResponse response,
      Object handler, Exception ex) throws Exception {
      // 移除用户,避免内存泄漏
      UserHolder.removeUser();
      }
      }
      • 注意:如果用户一直在操作,就要一直刷新 token 的有效期。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      package com.f.config;

      import com.f.utils.LoginInterceptor;
      import jakarta.annotation.Resource;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.data.redis.core.StringRedisTemplate;
      import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
      import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

      /**
      * springmvc相关的配置
      *
      * @author fzy
      * @date 2024/3/6 21:25
      */
      @Configuration
      public class MvcConfig implements WebMvcConfigurer {
      @Resource
      private StringRedisTemplate stringRedisTemplate;

      @Override
      public void addInterceptors(InterceptorRegistry registry) {
      registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
      .excludePathPatterns(
      // 指定以下这些路径不拦截
      "/user/code",
      "/user/login",
      "/blog/hot",
      "/shop/**",
      "/shop-type/**",
      "/upload/**",
      "/voucher/**"
      );
      }
      }
      • MvcConfig 注入 StringRedisTemplate,然后传给 LoginInterceptor,因为 LoginInterceptor 不是 bean,不能注入其他 bean。
登录拦截器的优化
  • 目前登录拦截器的业务逻辑是:会被拦截的用户请求进入拦截器,进行 preHandle 操作,在 preHandle 中对 redis 中的 token 进行有效期刷新。

    但是,如果用户在登录后,一直访问不需要拦截的路径,那 redis 中的 token 的有效期就一直不被刷新,最终 token 失效,用户还要重新登录。所以我们需要对登录拦截器进行优化:在这个拦截器前面再加个拦截器,然后在新增拦截器上进行保存用户信息到 ThreadLocal 和刷新 token 有效期的操作

    • 创建拦截器:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      package com.f.utils;

      import cn.hutool.core.bean.BeanUtil;
      import com.f.dto.UserDTO;
      import jakarta.servlet.http.HttpServletRequest;
      import jakarta.servlet.http.HttpServletResponse;
      import org.springframework.data.redis.core.StringRedisTemplate;
      import org.springframework.web.servlet.HandlerInterceptor;

      import java.time.Duration;
      import java.util.Map;

      /**
      * @author fzy
      * @date 2024/3/7 15:53
      */
      public class RefreshTokenInterceptor implements HandlerInterceptor {
      private StringRedisTemplate stringRedisTemplate;

      public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
      this.stringRedisTemplate = stringRedisTemplate;
      }

      @Override
      public boolean preHandle(HttpServletRequest request,
      HttpServletResponse response,
      Object handler) throws Exception {
      // 1.获取请求头中的token
      String token = request.getHeader("authorization");
      if (token == null) {
      // 如果token不存在,直接放行,由后续的拦截器进行拦截
      return true;
      }
      // 2.根据token获取redis中的用户
      Map userMap = stringRedisTemplate.opsForHash().entries(RedisConstants.LOGIN_USER_KEY + token);
      // 3.判断用户是否存在
      if (userMap.isEmpty()) {
      // 如果用户不存在,直接放行,由后续的拦截器进行拦截
      return true;
      }
      // 如果用户存在,就刷新token的有效期,并将用户保存在ThreadLocal中
      UserDTO user = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
      UserHolder.saveUser(user);
      stringRedisTemplate.expire(RedisConstants.LOGIN_USER_KEY + token, Duration.ofMinutes(RedisConstants.LOGIN_USER_TTL));
      return true;
      }

      @Override
      public void afterCompletion(HttpServletRequest request,
      HttpServletResponse response,
      Object handler, Exception ex) throws Exception {
      // 移除用户,避免内存泄漏
      UserHolder.removeUser();
      }
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      package com.f.utils;

      import com.f.dto.UserDTO;
      import jakarta.servlet.http.HttpServletRequest;
      import jakarta.servlet.http.HttpServletResponse;
      import org.springframework.web.servlet.HandlerInterceptor;

      /**
      * @author fzy
      * @date 2024/3/6 21:18
      */
      public class LoginInterceptor implements HandlerInterceptor {
      @Override
      public boolean preHandle(HttpServletRequest request,
      HttpServletResponse response,
      Object handler) throws Exception {
      UserDTO user = UserHolder.getUser();
      if (user == null) {
      // 如果用户不存在,则拦截,返回 401 状态码
      response.setStatus(401);
      return false;
      }
      return true;
      }
      }
    • 注册拦截器并指定顺序(order 越小优先级越高):

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      package com.f.config;

      import com.f.utils.LoginInterceptor;
      import com.f.utils.RefreshTokenInterceptor;
      import jakarta.annotation.Resource;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.data.redis.core.StringRedisTemplate;
      import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
      import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

      /**
      * springmvc相关的配置
      *
      * @author fzy
      * @date 2024/3/6 21:25
      */
      @Configuration
      public class MvcConfig implements WebMvcConfigurer {
      @Resource
      private StringRedisTemplate stringRedisTemplate;

      @Override
      public void addInterceptors(InterceptorRegistry registry) {
      // token刷新拦截器
      registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate))
      // 拦截所有请求
      .addPathPatterns("/**")
      .order(0);
      // 登录拦截器
      registry.addInterceptor(new LoginInterceptor())
      .excludePathPatterns(
      // 指定以下这些路径不拦截
      "/user/code",
      "/user/login",
      "/blog/hot",
      "/shop/**",
      "/shop-type/**",
      "/upload/**",
      "/voucher/**"
      )
      .order(1);
      }
      }

2.2 ★★★商户查询缓存

  • 缓存:就是数据交换的缓冲区(称作Cache),是存贮数据的临时地方,一般读写性能较高。
    • 由于 Redis 基于内存,具有低延迟,速度快的特征,所以 Redis 很适合用作缓存。

2.2.1 添加商户缓存

  • 添加商户缓存的流程:

    • 代码如下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      /**
      * 根据id查询商铺信息
      *
      * @param id 商铺id
      * @return 商铺详情数据
      */
      @GetMapping("/{id}")
      public Result queryShopById(@PathVariable("id") Long id) {
      return shopService.queryShopById(id);
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      package com.f.service;

      import com.f.dto.Result;
      import com.f.pojo.Shop;
      import com.baomidou.mybatisplus.extension.service.IService;

      /**
      * <p>
      * 服务类
      * </p>
      */
      public interface IShopService extends IService<Shop> {
      Result queryShopById(Long id);
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      package com.f.service.impl;

      import cn.hutool.json.JSONUtil;
      import com.f.dto.Result;
      import com.f.pojo.Shop;
      import com.f.mapper.ShopMapper;
      import com.f.service.IShopService;
      import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
      import com.f.utils.RedisConstants;
      import jakarta.annotation.Resource;
      import org.springframework.data.redis.core.StringRedisTemplate;
      import org.springframework.stereotype.Service;

      /**
      * <p>
      * 服务实现类
      * </p>
      */
      @Service
      public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
      @Resource
      private StringRedisTemplate stringRedisTemplate;

      @Override
      public Result queryShopById(Long id) {
      // 1.从redis中查询商铺缓存
      String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
      // 2.判断商铺缓存是否存在
      if (shopJson != null) {
      // 3.存在则直接返回
      Shop shop = JSONUtil.toBean(shopJson, Shop.class);
      return Result.ok(shop);
      }
      // 4.不存在则从数据库中查询商铺信息
      Shop shop = getById(id);
      // 5.判断商铺信息是否存在
      if (shop == null) {
      // 6.不存在则返回错误信息
      return Result.fail("店铺不存在");
      }
      // 7.存在则将商铺信息写入redis,并返回商铺信息
      stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + shop.getId(),
      JSONUtil.toJsonStr(shop), Duration.ofMinutes(RedisConstants.CACHE_SHOP_TTL));
      return Result.ok(shop);
      }
      }
      • 经过测试,在使用缓存之前,查询耗时 92ms;在使用缓存之后,查询耗时 11ms,大大加快了查询效率。

2.2.2 ★缓存更新策略

  • 如果数据库中的数据发生改变,而 Redis 还没及时更新相关数据,那么从 Redis 缓存中取到的数据就是过时的数据,就存在数据一致性问题。为了解决这个问题,我们需要使用缓存更新策略:

    • 内存淘汰
    • 超时剔除
    • 主动更新
★主动更新
  • 主动更新策略需要考虑下面几个问题:

    • 删除缓存还是更新缓存?
      • 删除缓存,查询时再更新缓存,避免无效的写操作
    • 如何保证缓存与数据库的操作同时成功或失败?
    • 先操作缓存还是先操作数据库?
      • 由于操作数据库的速度比操作缓存的速度慢,所以操作缓存的时候有极低概率会被操作数据库的线程抢占cpu,出现线程安全问题,而反过来出现线程安全问题的概率更大,所以先操作数据库再操作缓存
        • 视频 P38 有动画演示,不明白的话可以看一下。
  • 总结:缓存更新策略的最佳实践方案:

    1. 低一致性需求:使用 Redis 自带的内存淘汰机制。
    2. 高一致性需求:主动更新,并以超时剔除作为兜底方案:
      • 读操作:
        • 缓存命中则返回。
        • 缓存未命中则查询数据库,并写入缓存,设定超时时间。
      • 写操作:
        • 先写数据库,然后再删除缓存。
        • 要确保数据库与缓存操作的原子性。

2.2.3 实现商铺缓存与数据库的双写一致

  • 在更新商铺信息的方法中,实现前面所说的:

    • 先写数据库,然后再删除缓存。
    • 确保数据库与缓存操作的原子性。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * 更新商铺信息
    *
    * @param shop 商铺数据
    */
    @PutMapping
    public Result updateShop(@RequestBody Shop shop) {
    // 更新商铺,包括操作数据库和操作缓存
    return shopService.updateShopById(shop);
    }
    1
    2
    3
    4
    public interface IShopService extends IService<Shop> {
    ......
    Result updateShopById(Shop shop);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Transactional  // 事务保证原子性操作
    public Result updateShopById(Shop shop) {
    Long id = shop.getId();
    if (id == null) {
    return Result.fail("商铺不存在,无法更新");
    }
    // 1.更新数据库
    updateById(shop);
    // 2.删除缓存
    stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + id);
    return Result.ok();
    }

2.2.4 ★缓存穿透

  • 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

  • 缓存穿透的解决思路:

    • 缓存空对象

      • 当用户请求的数据在缓存和数据库中都不存在时,为了避免用户不断请求该数据,给数据库造成太大压力,所以在 Redis 缓存中缓存 null,这样用户再次重复请求后,得到的就是缓存中的 null
    • 布隆过滤

      布隆过滤器(Bloom Filter)是一种空间高效的数据结构,主要用于判断一个元素是否属于一个集合,而不存储具体的元素内容。它适用于那些需要判断大量元素是否存在的场景,如缓存、数据去重等。

      布隆过滤器说数据不存在,那是一定不存在,因为如果元素存在,那么其哈希函数对应的位置应该为 1。布隆过滤器说数据存在,数据未必存在,因为不同元素经过哈希函数映射到位数组时可能发生冲突,导致多个元素的多个位置相同。

      • 通过在客户端和 Redis 之间增加一层布隆过滤器来解决缓存穿透问题。不过还是存在一定的缓存穿透风险,原因如上,布隆过滤器说数据存在,数据未必存在。
  • 在考虑了缓存穿透问题后,对查询商铺的代码进行修改:

    • 增加了缓存空对象以及判断缓存内容是否为 "" 的逻辑代码。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    @Override
    public Result queryShopById(Long id) {
    // 1.从redis中查询商铺缓存
    String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
    // 2.判断商铺缓存是否存在且不为空
    if (shopJson != null && !shopJson.equals("")) {
    // 3.存在则直接返回
    Shop shop = JSONUtil.toBean(shopJson, Shop.class);
    return Result.ok(shop);
    }
    if (shopJson != null) { // 表示商铺信息是空值,用于解决缓存穿透问题
    return Result.fail("商铺信息不存在");
    }
    // 4.不存在则从数据库中查询商铺信息
    Shop shop = getById(id);
    // 5.判断商铺信息是否存在
    if (shop == null) {
    // 6.不存在则将空值写入redis,并返回错误信息
    stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id,
    "", Duration.ofMinutes(RedisConstants.CACHE_NULL_TTL));
    return Result.fail("店铺不存在");
    }
    // 7.存在则将商铺信息写入redis,并返回商铺信息
    stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + shop.getId(),
    JSONUtil.toJsonStr(shop), Duration.ofMinutes(RedisConstants.CACHE_SHOP_TTL));
    return Result.ok(shop);
    }
  • 除了被动的解决缓存穿透问题,也可以主动去避免出现缓存穿透问题,例如:

    • 增强 id 的复杂度,避免被猜测 id 规律
    • 做好数据的基础格式校验
    • 加强用户权限校验
    • 做好热点参数的限流

2.2.5 ★缓存雪崩

  • 缓存雪崩是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。
  • 缓存雪崩的解决思路:
    • 给不同的 keyTTL 添加随机值。
    • 利用 Redis 集群提高服务的可用性。
    • 给缓存业务添加降级限流策略,当 Redis 故障时,直接拒绝服务,避免将大量请求送到数据库,虽然降低了用户体验,但是保证了数据库的健康。
    • 给业务添加多级缓存,如浏览器缓存、Nginx缓存、Redis缓存、jvm内部缓存等。

2.2.6 ★缓存击穿

  • 缓存击穿问题也叫热点 key 问题,就是一个被高并发访问并且缓存重建业务较复杂key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

  • 缓存击穿的解决方案:

    • 互斥锁:查询缓存未命中,获取互斥锁,获取到互斥锁的才能查询数据库重建缓存,将数据写入缓存中后,释放锁。
    • 逻辑过期:查询缓存,发现逻辑时间已经过期,获取互斥锁,开启新线程;在新线程中查询数据库重建缓存,将数据写入缓存中后,释放锁;在释放锁之前,查询该数据时,都会将过期的数据返回。
    解决方案优点缺点
    互斥锁没有额外的内存消耗;保证一致性;实现简单线程需要等待,性能受影响;可能有死锁风险
    逻辑过期线程无需等待,性能较好有额外内存消耗;不保证一致性;实现复杂
基于互斥锁方式解决缓存击穿问题
  • 核心:利用 Redis 的 setnx 方法来表示获取锁。该方法的含义是:如果 Redis 中没有这个 Key,则插入成功;如果有这个 Key,则插入失败。通过插入成功或失败来表示是否有线程插入 Key,插入成功的 Key 则认为是获取到锁的线程;释放锁就是将这个 Key 删除,因为删除 Key 以后其他线程才能再执行 setnx 方法。

    • 互斥锁方式解决缓存击穿问题代码实现:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      // 在缓存穿透的基础上用互斥锁解决缓存击穿问题
      public Shop queryShopWithMutex(Long id) {
      // 1.从redis中查询商铺缓存
      String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
      // 2.判断商铺缓存是否存在且不为空
      if (shopJson != null && !shopJson.equals("")) {
      // 3.存在则直接返回
      Shop shop = JSONUtil.toBean(shopJson, Shop.class);
      return shop;
      }
      if (shopJson != null) { // 表示商铺信息是空值,用于解决缓存穿透问题
      return null;
      }
      Shop shop = null;
      try {
      // 4.不存在则尝试获取互斥锁
      boolean isLock = tryLock(RedisConstants.LOCK_SHOP_KEY + id);
      // 5.判断是否获取互斥锁
      if (!isLock) {
      // 6.获取互斥锁失败,休眠并重试
      Thread.sleep(50);
      return queryShopWithMutex(id);
      }
      // 7.获取互斥锁成功,去数据库查询商铺信息
      shop = getById(id);
      // 模拟缓存重建的延时
      Thread.sleep(200);
      // 8.判断商铺信息是否存在
      if (shop == null) {
      // 9.不存在则将空值写入redis,用于解决缓存穿透问题
      stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id,
      "", Duration.ofMinutes(RedisConstants.CACHE_NULL_TTL));
      return null;
      }
      // 9.存在则将商铺信息写入redis,并在finally中释放互斥锁
      stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + shop.getId(),
      JSONUtil.toJsonStr(shop), Duration.ofMinutes(RedisConstants.CACHE_SHOP_TTL));
      } catch (InterruptedException e) {
      e.printStackTrace();
      } finally {
      stringRedisTemplate.delete(RedisConstants.LOCK_SHOP_KEY + id);
      }
      return shop;
      }
      1
      2
      3
      4
      5
      6
      // 尝试获取锁,通过是否成功插入key来判断是否获取锁
      private boolean tryLock(String key) {
      Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1",
      Duration.ofSeconds(RedisConstants.LOCK_SHOP_TTL));
      return flag;
      }
      1
      2
      3
      4
      // 删除锁
      private void unlock(String key) {
      stringRedisTemplate.delete(key);
      }
基于逻辑过期方式解决缓存击穿问题
  • 既然要对 Shop 对象增加一个逻辑过期时间,那就需要增加 Shop 类的属性,这违反了 OCP 原则,为此,新建一个 RedisData 类,用于逻辑过期时间的增加。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    package com.f.utils;

    import lombok.Data;

    import java.time.LocalDateTime;

    @Data
    public class RedisData {
    private LocalDateTime expireTime;
    private Object data;
    }
    • 将店铺数据预设到缓存中:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      // 将店铺数据预设到缓存中
      public void saveShop2Redis(Long id, Long expireTime) {
      // 1.查询店铺数据
      Shop shop = getById(id);
      // 2.封装逻辑过期时间
      RedisData redisData = new RedisData();
      redisData.setData(shop);
      redisData.setExpireTime(LocalDateTime.now().plusDays(expireTime));
      // 3.写入redis缓存,没有设置redis中的ttl
      stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id,
      JSONUtil.toJsonStr(redisData));
      }
    • 逻辑过期方式解决缓存击穿问题代码实现:

      • 不考虑缓存穿透问题,因为缓存是人为预设的。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      // 线程池
      private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

      // 用逻辑过期解决缓存击穿问题(不考虑缓存穿透)
      public Shop queryShopWithLogicalExpire(Long id) {
      // 1.从redis中查询商铺缓存
      String shopJson = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
      // 2.判断商铺缓存是否命中
      if (shopJson == null) {
      // 3.未命中则返回空
      return null;
      }
      // 4.命中则再判断商铺缓存是否过期
      RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
      Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
      LocalDateTime expireTime = redisData.getExpireTime();
      // 5.判断商铺缓存是否过期
      if (expireTime.isAfter(LocalDateTime.now())) {
      // 6.商铺缓存未过期,直接返回
      return shop;
      }
      // 7.商铺缓存过期,尝试获取互斥锁
      boolean isLock = tryLock(RedisConstants.LOCK_SHOP_KEY + id);
      // 8.判断是否获取互斥锁
      if (isLock) {
      // 9.如果获取互斥锁,新开一个线程用于更新商铺缓存
      CACHE_REBUILD_EXECUTOR.submit(() -> {
      try {
      saveShop2Redis(id, RedisConstants.SHOP_EXPIRE_TTL);
      } catch (Exception e) {
      e.printStackTrace();
      } finally {
      // 释放互斥锁
      unlock(RedisConstants.LOCK_SHOP_KEY + id);
      }
      });
      }
      // 10.不论是否获取互斥锁,都返回过时的商铺信息
      return shop;
      }
  • 总的来说,缓存穿透、缓存雪崩、缓存击穿最终带来的问题都是给数据库带来了巨大的请求压力,可能会导致数据库的崩溃,所以需要采取相关措施来解决这些缓存问题。

2.3 优惠券秒杀

2.3.1 全局唯一ID

  • 全局 ID 生成器,是一种在分布式系统下用来生成全局唯一 ID 的工具,一般要满足下列特性:

    • 唯一性
    • 高可用
    • 高性能
    • 递增性
    • 安全性

    而 Redis 恰好能满足以上大部分特性,除了安全性需要另外做一些处理。

    • 为了 ID 的安全性,不直接使用 Redis 自增的数值,而是拼接一些其它信息:

      • 符号位:1bit,永远为0,表示正数
      • 时间戳:31bit,以秒为单位
      • 序列号:32bit,秒内的计数器,支持每秒产生 2^32 个不同 ID
  • 全局 ID 生成器生成全局唯一 ID 的实现代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    package com.f.utils;

    import jakarta.annotation.Resource;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Component;

    import java.time.LocalDateTime;
    import java.time.ZoneOffset;
    import java.time.format.DateTimeFormatter;

    /**
    * @author fzy
    * @date 2024/3/8 16:11
    */
    @Component
    public class RedisIdWorker {
    // 定义初始时间戳,以2000年1月1日为基准
    private static final long BEGIN_TIMESTAMP = LocalDateTime
    .of(2000, 1, 1, 0, 0, 0)
    .toEpochSecond(ZoneOffset.UTC);
    // 时间戳左移位数
    private static final int COUNT_BITS = 32;
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    // keyPrefix用于区分不同的业务
    public long nextId(String keyPrefix) {
    // 1.生成时间戳
    LocalDateTime now = LocalDateTime.now();
    long timeStamp = now.toEpochSecond(ZoneOffset.UTC) - BEGIN_TIMESTAMP;
    // 2.生成序列号(用redis的自增长)
    // 获取当前日期(精确到天)
    String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
    long incr = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + ":" + date);
    // 3.拼接并返回
    //return timeStamp << COUNT_BITS + incr;
    return timeStamp << COUNT_BITS | incr; // 用或运算效率更高
    }
    }
    • 使用 Redis 达到全局唯一 ID 的策略:
      • 每天一个 key,例如:incr:order:2024:03:08 表示是 2024 年 3 月 8 日关于订单业务的 key,既防止 redis 自增值太大,又方便统计订单量(订单量即为 key 对应的 value)。
      • ID 构造是:时间戳 + 计数器

2.3.2 优惠券秒杀下单

  • 优惠券秒杀下单的流程:

    • 下单时需要判断两点:

      • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
      • 库存是否充足,不足则无法下单
    • 根据流程图进行代码编写:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      package com.f.controller;


      import com.f.dto.Result;
      import com.f.service.IVoucherOrderService;
      import jakarta.annotation.Resource;
      import org.springframework.web.bind.annotation.PathVariable;
      import org.springframework.web.bind.annotation.PostMapping;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;

      /**
      * <p>
      * 前端控制器
      * </p>
      */
      @RestController
      @RequestMapping("/voucher-order")
      public class VoucherOrderController {
      @Resource
      private IVoucherOrderService voucherOrderService;

      @PostMapping("seckill/{id}")
      public Result seckillVoucher(@PathVariable("id") Long voucherId) {
      return voucherOrderService.seckillVoucher(voucherId);
      }
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      package com.f.service;

      import com.f.dto.Result;
      import com.f.pojo.VoucherOrder;
      import com.baomidou.mybatisplus.extension.service.IService;

      /**
      * <p>
      * 服务类
      * </p>
      */
      public interface IVoucherOrderService extends IService<VoucherOrder> {
      Result seckillVoucher(Long voucherId);
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      package com.f.service.impl;

      import com.f.dto.Result;
      import com.f.pojo.SeckillVoucher;
      import com.f.pojo.VoucherOrder;
      import com.f.mapper.VoucherOrderMapper;
      import com.f.service.ISeckillVoucherService;
      import com.f.service.IVoucherOrderService;
      import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
      import com.f.utils.RedisIdWorker;
      import com.f.utils.UserHolder;
      import jakarta.annotation.Resource;
      import org.springframework.stereotype.Service;
      import org.springframework.transaction.annotation.Transactional;

      import java.time.LocalDateTime;

      /**
      * <p>
      * 服务实现类
      * </p>
      */
      @Service
      public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
      @Resource
      private ISeckillVoucherService seckillVoucherService;

      @Resource
      private RedisIdWorker redisIdWorker;

      @Override
      @Transactional
      public Result seckillVoucher(Long voucherId) {
      // 1.查询优惠券信息
      SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
      // 2.判断秒杀是否开始或结束
      LocalDateTime now = LocalDateTime.now();
      if (voucher.getBeginTime().isAfter(now)
      || voucher.getEndTime().isBefore(now)) {
      // 3.如果秒杀还未开始或者已经结束,返回错误信息
      return Result.fail("秒杀还未开始或已经结束");
      }
      // 4.如果秒杀开始,且没有结束,再判断库存是否充足
      if (voucher.getStock() < 1) {
      // 5.库存不足,返回错误信息
      return Result.fail("库存不足");
      }
      // 6.库存充足,扣减库存
      boolean success = seckillVoucherService.update()
      .setSql("stock = stock -1") // set stock = stock - 1
      .eq("voucher_id", voucherId) // where id = ?
      .update();
      if (!success) {
      return Result.fail("库存不足");
      }
      // 7.创建订单
      VoucherOrder order = new VoucherOrder();
      order.setId(redisIdWorker.nextId("order"));
      order.setUserId(UserHolder.getUser().getId());
      order.setVoucherId(voucherId);
      save(order);
      // 8.返回订单id
      return Result.ok(order.getId());
      }
      }

2.3.3 ★超卖问题

  • 在高并发场景下,上面的代码会出现超卖问题,即,虽然库存只有 100 张特价券,但却出现卖出超过 100 张特价券的情况。这是因为多线程在同时操作同一个资源。

    超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁,加锁分为两种:悲观锁乐观锁

    • 悲观锁比较简单,直接添加同步锁,让线程串行执行,但性能不高。
  • 我们重点来看乐观锁。

★乐观锁解决超卖问题
  • 乐观锁的关键是在修改时,判断之前查询得到的数据是否有被修改过(也就是在 set 语句中添加 where 条件),常见的方式有两种:

    • 版本号法:每个数据记录都有一个版本号,用于追踪数据的变化。在并发操作中,乐观锁机制通过比较版本号来判断数据是否已经被其他事务修改,从而协调事务的执行。

    • CAS法CAS(Compare And Swap),CAS 有 3 个操作数:

      • 需要读写的内存值 V
    • 进行比较的值 A

      • 拟写入的新值 B

      当且仅当预期值 A 和内存值 V 相同时,将内存值 V 修改为 B,否则什么都不做。

      • CAS 法可以看作是版本号法的简化。
  • 接下来用 CAS 法解决库存超卖问题,只需要修改前面代码中的第 6 步即可:

    1
    2
    3
    4
    5
    6
    7
    8
    // 6.库存充足,扣减库存
    boolean success = seckillVoucherService.update()
    .setSql("stock = stock -1") // set stock = stock - 1
    .eq("voucher_id", voucherId).eq("stock", voucher.getStock()) // where id = ? and stock = ? 添加了乐观锁
    .update();
    if (!success) {
    return Result.fail("库存不足");
    }
    • 结果是不会出现线程安全问题,但是特价券会出现过剩的情况,即 100 张特价券并没有被卖完。这就是乐观所的弊端:例如多个线程一开始标识 stock 为 100,然后有个线程把 stock 减一了,其他那些线程就会返回错误,特价券就剩余下来了。

    • 改进:不去在 sql 语句中判断库存是否改变,而是在 sql 语句中判断库存是否 >0:

      1
      2
      3
      4
      5
      6
      7
      8
      // 6.库存充足,扣减库存
      boolean success = seckillVoucherService.update()
      .setSql("stock = stock -1") // set stock = stock - 1
      .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
      .update();
      if (!success) {
      return Result.fail("库存不足");
      }
  • 乐观锁用于修改数据时。

2.3.4 一人一单

  • 需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单。

  • 在之前的代码中增加判断用户是否已经买过该特价券的业务逻辑:

    • 代码实现:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      @Service
      public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
      @Resource
      private ISeckillVoucherService seckillVoucherService;

      @Resource
      private RedisIdWorker redisIdWorker;

      @Override
      @Transactional
      public Result seckillVoucher(Long voucherId) {
      // 1.查询优惠券信息
      SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
      // 2.判断秒杀是否开始或结束
      LocalDateTime now = LocalDateTime.now();
      if (voucher.getBeginTime().isAfter(now)
      || voucher.getEndTime().isBefore(now)) {
      // 3.如果秒杀还未开始或者已经结束,返回错误信息
      return Result.fail("秒杀还未开始或已经结束");
      }
      // 4.如果秒杀开始,且没有结束,再判断库存是否充足
      if (voucher.getStock() < 1) {
      // 5.库存不足,返回错误信息
      return Result.fail("库存不足");
      }
      // 6.判断用户是否已经买过该特价券了
      Long userId = UserHolder.getUser().getId();
      long count = query().eq("user_id", userId).eq("voucher_id", voucherId)
      .count();
      // 7.如果用户已经买过该特价券,就返回错误信息
      if (count > 0) {
      return Result.fail("您已经购买过该特价券,每人仅限一张");
      }
      // 8.库存充足,扣减库存
      boolean success = seckillVoucherService.update()
      .setSql("stock = stock -1") // set stock = stock - 1
      .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
      .update();
      if (!success) {
      return Result.fail("库存不足");
      }
      // 9.创建订单
      VoucherOrder order = new VoucherOrder();
      order.setId(redisIdWorker.nextId("order"));
      order.setUserId(userId);
      order.setVoucherId(voucherId);
      save(order);
      // 10.返回订单id
      return Result.ok(order.getId());
      }
      }
      • 用 jmeter 压测发现,用户还是可以购买多张特价券,原因在于:先查询,再判断。在多并发场景下,如果多个用户同时查询,那在判断的时候,就出现了多线程导致的错误。
    • 既然是多线程错误,那加锁吧,但是注意:不能用乐观锁,因为乐观锁用于修改数据时,而这里是查询数据,所以我们要用悲观锁。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      package com.f.service.impl;

      import com.f.dto.Result;
      import com.f.pojo.SeckillVoucher;
      import com.f.pojo.VoucherOrder;
      import com.f.mapper.VoucherOrderMapper;
      import com.f.service.ISeckillVoucherService;
      import com.f.service.IVoucherOrderService;
      import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
      import com.f.utils.RedisIdWorker;
      import com.f.utils.UserHolder;
      import jakarta.annotation.Resource;
      import org.springframework.stereotype.Service;
      import org.springframework.transaction.annotation.Transactional;

      import java.time.LocalDateTime;

      /**
      * <p>
      * 服务实现类
      * </p>
      */
      @Service
      public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
      @Resource
      private ISeckillVoucherService seckillVoucherService;

      @Resource
      private RedisIdWorker redisIdWorker;

      @Override
      public Result seckillVoucher(Long voucherId) {
      // 1.查询优惠券信息
      SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
      // 2.判断秒杀是否开始或结束
      LocalDateTime now = LocalDateTime.now();
      if (voucher.getBeginTime().isAfter(now)
      || voucher.getEndTime().isBefore(now)) {
      // 3.如果秒杀还未开始或者已经结束,返回错误信息
      return Result.fail("秒杀还未开始或已经结束");
      }
      // 4.如果秒杀开始,且没有结束,再判断库存是否充足
      if (voucher.getStock() < 1) {
      // 5.库存不足,返回错误信息
      return Result.fail("库存不足");
      }
      return createVoucherOrder(voucherId);
      }

      @Transactional
      public synchronized Result createVoucherOrder(Long voucherId) { // 使用悲观锁
      // 6.判断用户是否已经买过该特价券了
      Long userId = UserHolder.getUser().getId();
      long count = query().eq("user_id", userId).eq("voucher_id", voucherId)
      .count();
      // 7.如果用户已经买过该特价券,就返回错误信息
      if (count > 0) {
      return Result.fail("您已经购买过该特价券,每人仅限一张");
      }
      // 8.库存充足,扣减库存
      boolean success = seckillVoucherService.update()
      .setSql("stock = stock -1") // set stock = stock - 1
      .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
      .update();
      if (!success) {
      return Result.fail("库存不足");
      }
      // 9.创建订单
      VoucherOrder order = new VoucherOrder();
      order.setId(redisIdWorker.nextId("order"));
      order.setUserId(userId);
      order.setVoucherId(voucherId);
      save(order);
      // 10.返回订单id
      return Result.ok(order.getId());
      }
      }
  • 通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了,因为在集群模式下,我们会有多个 jvm 的锁监视器,就导致可能出现同一个用户的多个线程都能执行的情况。具体可以看视频 P55

    • 为了解决这个问题,我们需要使用分布式锁。

2.3.5 ★★★分布式锁

  • 在集群模式下,synchronized 锁失效了,**synchronized 只能保证单个 jvm 内部的多个线程之间互斥**,而没有办法让集群模式下的多个 jvm 进程之间互斥。
2.3.5.1 分布式锁-原理
  • 为了实现多个 jvm 进程之间的互斥,我们不去使用 jvm 内部的锁监视器,而是在外部开一个锁监视器,让它监视所有的线程。

  • 分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

    分布式锁需要满足的特性:

    • 多进程可见,互斥,高可用,高性能,安全性。
  • 分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:

    • MySQL:MySQL 本身带有锁机制,但是由于 MySQL 性能一般,所以采用分布式锁的情况下,使用 MySQL 作为分布式锁比较少见。
    • Redis:Redis 作为分布式锁比较常见,利用 setnx 方法,如果 Key 插入成功,则表示获取到锁,插入失败则表示无法获取到锁。
    • Zookeeper:Zookeeper 也是企业级开发中比较好的一个实现分布式锁的方案。
    MySQLRedisZookeeper
    互斥利用 MySQL 本身的互斥锁机制利用 setnx 互斥命令利用节点的唯一性和有序性
    高可用
    高性能一般一般
    安全性断开链接,自动释放锁利用锁超时时间,到期释放临时节点,断开链接自动释放
2.3.5.2 分布式锁-Redis实现
  • 我们在这里基于 Redis 来实现分布式锁:

    实现分布式锁时需要实现的两个基本方法:

    • 获取锁

      • 互斥:确保只能有一个线程获取锁。

        1
        2
        # 添加锁,利用setnx的互斥特性
        SETNX lock thread1
    • 释放锁

      • 手动释放:

        1
        2
        # 释放锁,删除即可
        DEL lock
      • 超时释放:获取锁时添加一个超时时间。

        1
        2
        # 添加锁过期时间避免服务宕机引起的死锁
        EXPIRE lock 10
  • 思考:如何做到添加锁操作和超时释放锁操作必须同时成功或者同时失败

    避免刚添加完锁后,服务器宕机,不能进行超时释放锁的操作。

    • SET 操作和 EXPIRE 操作写在同个语句即可:SET lock thread1 EX 10 NX (NX 表示 key 不存在的时候才可以 SET)。
  • 接下来基于 Redis 实现分布式锁的初级版本:

    • 定义分布式锁的接口:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      package com.f.utils;

      /**
      * @author fzy
      * @date 2024/3/9 9:41
      */
      public interface ILock {
      /**
      * 获取锁
      *
      * @param timeoutSec 锁的超时时间,过期后自动释放
      * @return true代表获取锁成功,false代表失败
      */
      boolean tryLock(long timeoutSec);

      /**
      * 释放锁
      */
      void unlock();
      }
    • 实现接口:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      package com.f.utils;

      import org.springframework.data.redis.core.StringRedisTemplate;

      import java.time.Duration;

      /**
      * @author fzy
      * @date 2024/3/9 9:44
      */
      public class SimpleRedisLock implements ILock {
      private String key;
      private StringRedisTemplate stringRedisTemplate;
      private static final String KEY_PREFIX = "lock:";

      public SimpleRedisLock(String key, StringRedisTemplate stringRedisTemplate) {
      this.key = key;
      this.stringRedisTemplate = stringRedisTemplate;
      }

      @Override
      public boolean tryLock(long timeoutSec) {
      // 获取当前线程标识
      Long threadId = Thread.currentThread().getId();
      return Boolean.TRUE.equals(stringRedisTemplate.opsForValue()
      .setIfAbsent(KEY_PREFIX + key, threadId.toString(), Duration.ofSeconds(timeoutSec)));
      }

      @Override
      public void unlock() {
      stringRedisTemplate.delete(KEY_PREFIX + key);
      }
      }
    • 在一人一单业务中使用分布式锁:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      package com.f.service.impl;

      import com.f.dto.Result;
      import com.f.pojo.SeckillVoucher;
      import com.f.pojo.VoucherOrder;
      import com.f.mapper.VoucherOrderMapper;
      import com.f.service.ISeckillVoucherService;
      import com.f.service.IVoucherOrderService;
      import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
      import com.f.utils.RedisIdWorker;
      import com.f.utils.SimpleRedisLock;
      import com.f.utils.UserHolder;
      import jakarta.annotation.Resource;
      import org.springframework.aop.framework.AopContext;
      import org.springframework.data.redis.core.StringRedisTemplate;
      import org.springframework.stereotype.Service;
      import org.springframework.transaction.annotation.Transactional;

      import java.time.LocalDateTime;

      /**
      * <p>
      * 服务实现类
      * </p>
      */
      @Service
      public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
      @Resource
      private ISeckillVoucherService seckillVoucherService;
      @Resource
      private RedisIdWorker redisIdWorker;
      @Resource
      private StringRedisTemplate stringRedisTemplate;

      @Override
      public Result seckillVoucher(Long voucherId) {
      // 1.查询优惠券信息
      SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
      // 2.判断秒杀是否开始或结束
      LocalDateTime now = LocalDateTime.now();
      if (voucher.getBeginTime().isAfter(now)
      || voucher.getEndTime().isBefore(now)) {
      // 3.如果秒杀还未开始或者已经结束,返回错误信息
      return Result.fail("秒杀还未开始或已经结束");
      }
      // 4.如果秒杀开始,且没有结束,再判断库存是否充足
      if (voucher.getStock() < 1) {
      // 5.库存不足,返回错误信息
      return Result.fail("库存不足");
      }
      // 使用分布式锁(对同一个用户的id上锁)
      SimpleRedisLock lock = new SimpleRedisLock("order:" + UserHolder.getUser().getId(), stringRedisTemplate);
      // 获取锁
      boolean isLock = lock.tryLock(10L);
      if (!isLock) { // 获取锁失败
      return Result.fail("您已经购买过该特价券,每人仅限一张");
      }
      try {
      IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); // 代理对象才能实现事务
      return proxy.createVoucherOrder(voucherId);
      } finally {
      lock.unlock(); // 释放锁
      }
      }

      @Transactional
      public Result createVoucherOrder(Long voucherId) {
      // 6.判断用户是否已经买过该特价券了
      Long userId = UserHolder.getUser().getId();
      long count = query().eq("user_id", userId).eq("voucher_id", voucherId)
      .count();
      // 7.如果用户已经买过该特价券,就返回错误信息
      if (count > 0) {
      return Result.fail("您已经购买过该特价券,每人仅限一张");
      }
      // 8.库存充足,扣减库存
      boolean success = seckillVoucherService.update()
      .setSql("stock = stock -1") // set stock = stock - 1
      .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
      .update();
      if (!success) {
      return Result.fail("库存不足");
      }
      // 9.创建订单
      VoucherOrder order = new VoucherOrder();
      order.setId(redisIdWorker.nextId("order"));
      order.setUserId(userId);
      order.setVoucherId(voucherId);
      save(order);
      // 10.返回订单id
      return Result.ok(order.getId());
      }
      }
2.3.5.3 Redis分布式锁误删问题
  • 考虑这样一种极端情况:

    • 线程 1 首先获取分布式锁,然后执行自己的业务,但是线程 1 的业务阻塞了,超过了分布式锁的 TTL,所以分布式锁被超时释放了,此时线程 1 还在执行业务。
    • 线程 2 来获取分布式锁,因为线程 1 的分布式锁被超时释放,所以线程 2 可以获取分布式锁,然后执行自己的业务。
    • 此时问题来了,在线程 2 执行自己的业务时,线程 1 的业务完成,它开始手动释放锁,因为线程 1 和线程 2 共用一把锁,这就导致,线程 1 将线程 2 的分布式锁释放了
    • 线程 3 再来获取分布式锁,因为分布式锁被线程 1 释放了(按理来说应该是由线程 2 来释放的),所以线程 3 可以获取分布式锁,然后执行自己的业务。

    这就造成了分布式锁误删的问题。

    • 为了解决这一问题,我们需要在释放锁的时候判断锁的标识是否一致,Redis 锁的标识一般是指 value 的区分,前面在获取锁时,我们是将线程标识,即线程 id 作为 value 存入,因此在释放锁时,只要判断当前线程的 id 是否为当前锁的 value 即可。

      • 这里的线程标识,我们之前用的是线程 id 进行标识,但是如果放到集群线程下,多个 jvm 可能会出现同个线程 id 的线程,这样会引发线程安全问题,所以这里要用 ThreadID + UUIDUUID 来区分不同的 jvm,用 ThreadID 来区分不同的线程
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      package com.f.utils;

      import cn.hutool.core.lang.UUID;
      import org.springframework.data.redis.core.StringRedisTemplate;

      import java.time.Duration;

      /**
      * @author fzy
      * @date 2024/3/9 9:44
      */
      public class SimpleRedisLock implements ILock {
      private String key;
      private StringRedisTemplate stringRedisTemplate;
      private static final String KEY_PREFIX = "lock:";
      private static final String UUID_PREFIX = UUID.randomUUID().toString(true);

      public SimpleRedisLock(String key, StringRedisTemplate stringRedisTemplate) {
      this.key = key;
      this.stringRedisTemplate = stringRedisTemplate;
      }

      @Override
      public boolean tryLock(long timeoutSec) {
      // 获取当前线程标识
      String threadId = UUID_PREFIX + "-" + Thread.currentThread().getId();
      return Boolean.TRUE.equals(stringRedisTemplate.opsForValue()
      .setIfAbsent(KEY_PREFIX + key, threadId, Duration.ofSeconds(timeoutSec)));
      }

      @Override
      public void unlock() {
      // 获取线程标识
      String threadId = UUID_PREFIX + "-" + Thread.currentThread().getId();
      // 获取锁中标识
      String value = stringRedisTemplate.opsForValue().get(KEY_PREFIX + key);
      // 判断线程标识和锁中标识是否一致
      if (threadId.equals(value)) {
      // 一致则释放锁
      stringRedisTemplate.delete(KEY_PREFIX + key);
      }
      }
      }
2.3.5.4 分布式锁的原子性问题
  • 我们考虑一种比较极端的情况:

    • SimpleRedisLockunlock 方法中,“判断线程标识和锁中标识是否一致” 和 “释放锁” 是两步操作,在这两步操作中,如果发生了阻塞,就有可能会出现并发问题:

      • 线程 1 首先获取分布式锁,然后执行自己的业务,线程 1 执行完自己的业务,并判断线程标识和锁中标识一致,就在线程 1 要释放分布式锁时,发生了阻塞(例如 jvm 的垃圾回收机制会阻塞所有代码)。
      • 线程 1 一直阻塞直到超时释放锁,此时线程 2 来获取分布式锁,因为线程 1 的分布式锁被超时释放,所以线程 2 可以获取分布式锁,然后执行自己的业务。
      • 此时问题来了,在线程 2 执行自己的业务时,线程 1 的阻塞结束,因为线程 1 在之前已经判断线程标识和锁中标识一致,所以线程 1 接下来直接执行释放锁的操作,于是线程 1 将线程 2 的分布式锁释放了。
      • 线程 3 再来获取分布式锁,因为分布式锁被线程 1 释放了(按理来说应该是由线程 2 来释放的),所以线程 3 可以获取分布式锁,然后执行自己的业务。

      同样出现了分布式锁被误删的问题,这里的原因在于:“判断线程标识和锁中标识是否一致” 和 “释放锁” 是两步操作,存在原子性问题。

Lua脚本
  • Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。Lua 是一种编
    程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html

    • Lua 语言调用 Redis:redis.call('命令名称', 'key', '其他参数', ...)

      • 写好脚本以后,使用 Redis 命令来调用脚本:EVAL script numkeys key [key ...] arg [arg ...]
  • 在介绍了 Lua 脚本后,再来看看之前分布式锁的原子性问题:

    • 目前 unlock 的业务流程:

      1. 获取锁中的线程标识
      2. 判断是否与指定的标识(当前线程标识)一致
      3. 如果一致则释放锁(删除)
      4. 如果不一致则什么都不做

      用 Lua 脚本:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      -- 锁的key
      -- local key = KEYS[1]
      -- 当前线程标识
      -- local threadId = ARGV[1]
      -- 获取锁中的线程标识
      local id = redis.call('get',KEYS[1])
      -- 比较
      if(id == ARGV[1]) then
      return redis.call('del',KEYS[1])
      end
      return 0
Java调用Lua脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.f.utils;

import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.time.Duration;
import java.util.Collections;

/**
* @author fzy
* @date 2024/3/9 9:44
*/
public class SimpleRedisLock implements ILock {
private String key;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
private static final String UUID_PREFIX = UUID.randomUUID().toString(true);
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

public SimpleRedisLock(String key, StringRedisTemplate stringRedisTemplate) {
this.key = key;
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean tryLock(long timeoutSec) {
// 获取当前线程标识
String threadId = UUID_PREFIX + "-" + Thread.currentThread().getId();
return Boolean.TRUE.equals(stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + key, threadId, Duration.ofSeconds(timeoutSec)));
}

@Override
public void unlock() {
// 调用Lua脚本
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + key),
UUID_PREFIX + "-" + Thread.currentThread().getId());
}
}
  • 总结:
    • 基于 Redis 的分布式锁的实现思路:
      • 利用 set nx ex 获取锁,并设置过期时间,保存线程标识;
      • 释放锁时先判断线程标识是否与自己一致,一致则删除锁,并使用 Lua 脚本保证原子性操作
    • 特性:
      • 利用 set nx 满足互斥性
      • 利用 set ex 保证故障时锁依然能释放,避免死锁,提高安全性
      • 利用 Redis 集群保证高可用和高并发特性

2.3.6 Redisson

  • 基于 setnx 实现的分布式锁存在下面的问题:
    • 不可重入:同一个线程无法多次获取同一把锁。
    • 不可重试:获取锁只尝试一次就返回 false,没有重试机制。
    • 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患。
    • 主从一致性:如果 Redis 提供了主从集群,由于主从同步存在延迟,当主宕机时,如果从还未同步主中的锁数据,则会出现主从一致性问题。
  • Redisson 可以帮助我们,Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid),它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
2.3.6.1 Redisson入门
  • 使用 Redisson 的步骤:

    1. 导入 Redisson 依赖:

      1
      2
      3
      4
      5
      6
      <!--redisson依赖-->
      <dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson</artifactId>
      <version>3.13.6</version>
      </dependency>
    2. 配置 Redisson 客户端:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      package com.f.config;

      import org.redisson.Redisson;
      import org.redisson.api.RedissonClient;
      import org.redisson.config.Config;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;

      /**
      * @author fzy
      * @date 2024/3/12 21:47
      */
      @Configuration
      public class RedissonConfig {
      @Bean
      public RedissonClient redissonClient() {
      // 配置类
      Config config = new Config();
      // 添加Redis地址
      config.useSingleServer().setAddress("redis://192.168.44.132:6379").setPassword("123456");
      // 返回RedissonClient对象
      return Redisson.create(config);
      }
      }
    3. 使用 Redisson 的分布式锁:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      @Resource
      private RedissonClient redissonClient;

      @Override
      public Result seckillVoucher(Long voucherId) {
      ......
      // 使用Redisson的分布式锁(对同一个用户的id上锁)
      RLock lock = redissonClient.getLock("lock:order:" + UserHolder.getUser().getId());
      // 获取锁
      boolean isLock = lock.tryLock();
      if (!isLock) { // 获取锁失败
      return Result.fail("您已经购买过该特价券,每人仅限一张");
      }
      try {
      IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); // 代理对象才能实现事务
      return proxy.createVoucherOrder(voucherId);
      } finally {
      lock.unlock(); // 释放锁
      }
      }
2.3.6.2 Redisson可重入锁原理
  • 为什么要引入可重入锁这种机制?

    • 假如有一个线程 1 获得了对象 A 的锁,那么该线程 1 如果在未释放锁前再次请求该对象的锁时,如果没有可重入锁的机制,线程 1 是不会再次获取到锁的,这样的话就会出现死锁的情况。
    • 可重入锁的概念就是:自己可以获取自己的锁。
  • 为了实现可重入锁,就要在锁已经存在的情况下进行判断,判断当前锁是否是自己的锁,如果是自己的锁,就可以再次获取,并将获取次数加 1,如果不是自己的锁,就获取锁失败。

    • 因为要另外记录重入次数,所以不能再用 String 类型的数据结构,而是要使用 Hash 类型的数据结构:

      • 第一次获取锁时,value 值为 1,然后每重入一次,value 值加一。
  • 释放锁时,不是直接删除锁,而是将锁的 value 值减一,直到 value 为 0 的时候才可以删除锁。

    因为 Hash 类型的 Redis 命令没有 set ex nx 这种命令,所以我们只能先判断是否存在(exist),再设置过期时间。为了保证原子性操作,我们要将上图中锁的业务逻辑写在 Lua 脚本里。

  • Redisson 的可重入锁就利用了上面类似的原理。

2.3.6.3 Redisson锁重试和WatchDog机制
  • Redisson 分布式锁原理:

    • 可重入:利用 Hash 结构记录线程标识和重入次数。
    • 可重试:利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制。
    • 超时续约:利用 WatchDog 机制,每隔一段时间(leaseTime / 3),重置超时时间。

    剩下还有一个主从一致性问题。

2.3.6.4 Redisson的multiLock原理
  • 既然使用 Redis 的主从集群会出现主从一致性问题,那就干脆不使用主从集群,而是将每个 Redis 节点都作为主节点。

    • 在获取锁时,应用会向每个 Redis 主节点都获取锁,即向每个 Redis 主节点都进行 set nx ex 操作,只有向每个 Redis 主节点进行的 set nx ex 操作都成功了,应用才能真正获取锁
      • 为了 Redis 集群的高性能,可以在每个 Redis 主节点后面再挂上从节点。
    • 此时,就算其中一个 Redis 主节点宕机了,相应的从节点中没有保存锁数据,也不会出现并发问题:
      • 就算别的应用能对该 “新的主节点” 进行 set nx ex 操作来获取锁(因为没有保存锁数据),但是该应用不能对其他的 Redis 主节点进行获取锁的操作,而我们说过,只有向每个 Redis 主节点进行的 set nx ex 操作都成功了,应用才能真正获取锁,所以该应用并不能获取锁。

    这样,就不会因为主从一致性,而导致并发问题。

    • 这种将多个锁联合在一起,成为一个联锁的方案,就是 multiLock
2.3.6.5 ★总结
  • 分布式锁总结
    • 不可重入Redis分布式锁:
      • 原理:利用 setnx 的互斥性;利用 ex 避免死锁;释放锁时判断线程标识
      • 缺陷:不可重入、无法重试、锁超时失效
    • 可重入Redis分布式锁
      • 原理:利用 Hash 结构,记录线程标识和重入次数;利用信号量控制锁重试等待;利用 WatchDog 延续锁时间
        • 解决了不可重入、无法重试、锁超时失效的问题
      • 缺陷:Redis 主节点宕机引起锁失效问题
    • **multiLock**:
      • 原理:多个独立的 Redis 节点,必须在所有节点都获取重入锁,才算获取锁成功
      • 缺陷:运维成本高、实现复杂

2.3.7 Redis秒杀优化

  • 由于目前的优惠券秒杀业务包含以下几步:

    • 查询优惠券,判断秒杀库存
    • 查询订单,校验一人一单(用到分布式锁,保证一人一单
    • 减库存(用到乐观锁,防止超卖
    • 创建订单

    这几步都涉及到和数据库的操作,所以业务的性能并不高,业务耗时较长。

    为了提高业务性能,考虑对业务流程进行优化:

    • 判断秒杀库存和校验一人一单的步骤完成后,其实就可以返回结果给客户,而不需要等到减库存和创建订单的步骤完成,特别是减库存和创建订单涉及数据库的写操作,耗时会比较久。

      • 可以在判断秒杀库存和校验一人一单的步骤完成后,就将优惠券id、用户id、订单id的相关信息保存到阻塞队列,然后就返回结果给客户。而且通过 Redis 进行这两步来提高性能。
      • 接着由另外的线程异步读取阻塞队列中的信息,完成减库存和创建订单的操作。

      通过异步执行优惠券秒杀业务,来提高业务性能。

  • 用 String 类型来存储库存,用 Set 类型来存储用户 id(保证唯一性,即保证一人一单)

  • 改进秒杀业务,提高并发性能。

    需求:

    1. 新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中
    2. 基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
    3. 如果抢购成功,将优惠券 id 和用户 id 封装后存入阻塞队列
    4. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
2.3.7.1 基于Redis完成秒杀资格的判断
  • 新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
    // 保存秒杀库存到redis中
    stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
    }
  • 基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    -- 1.参数列表
    -- 1.1 优惠券id
    local voucherId = ARGV[1]
    -- 1.2用户id
    local userId = ARGV[2]

    -- 2.数据key
    -- 2.1 库存key
    local stockKey = 'seckill:stock:'..voucherId
    -- 2.2 订单key
    local orderKey = 'seckill:order:'..voucherId

    -- 3.脚本业务
    -- 3.1判断库存是否充足
    if (tonumber(redis.call('get', stockKey)) <= 0)
    then return 1
    end
    -- 3.2判断用户是否下单 sismember orderKey userId
    if(redis.call('sismember',orderKey,userId)==1)
    then return 2
    end
    -- 3.3扣库存 incrby stockKey -1
    redis.call('incrby',stockKey,-1)
    -- 3.4下单 sadd orderKey userId
    redis.call('sadd',orderKey,userId)
    return 0
2.3.7.2 基于阻塞队列实现异步秒杀下单
  • 如果抢购成功,将下单信息封装后存入阻塞队列:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Override
    public Result seckillVoucher(Long voucherId) {
    // 1.执行lua脚本
    Long userId = UserHolder.getUser().getId();
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
    Collections.emptyList(),
    voucherId.toString(), userId.toString());
    // 2.判断结果是否为0
    // 3.不为0则返回错误信息
    if (!result.equals(0L)) {
    return Result.fail(result.equals(1L) ? "库存不足" : "您已经购买过该特价券,每人仅限一张");
    }
    // 4.为0则将下单信息保存到阻塞队列中
    VoucherOrder order = new VoucherOrder();
    long orderId = redisIdWorker.nextId("order");
    order.setId(orderId);
    order.setUserId(userId);
    order.setVoucherId(voucherId);
    orderTasks.add(order);
    // 5.返回订单id
    return Result.ok(orderId);
    }
  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    private class VoucherOrderHandler implements Runnable {
    @Override
    public void run() {
    while (true) {
    try {
    // 1.获取队列中的订单信息
    VoucherOrder voucherOrder = orderTasks.take();
    // 2.减少库存,创建订单
    // 2.1库存充足,扣减库存
    boolean success = seckillVoucherService.update()
    .setSql("stock = stock -1") // set stock = stock - 1
    .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
    .update();
    if (!success) {
    log.error("库存不足");
    return;
    }
    // 2.2创建订单
    save(voucherOrder);
    } catch (Exception e) {
    log.error("处理订单异常", e);
    }
    }
    }
    }
  • 最终优化后的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    /**
    * <p>
    * 服务实现类
    * </p>
    */
    @Slf4j
    @Service
    public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Resource
    private ISeckillVoucherService seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private RedissonClient redissonClient;
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
    SECKILL_SCRIPT = new DefaultRedisScript<>();
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
    SECKILL_SCRIPT.setResultType(Long.class);
    }

    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); // 创建阻塞队列
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    @PostConstruct
    private void init() {
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable {
    @Override
    public void run() {
    while (true) {
    try {
    // 1.获取队列中的订单信息
    VoucherOrder voucherOrder = orderTasks.take();
    // 2.减少库存,创建订单
    // 2.1库存充足,扣减库存
    boolean success = seckillVoucherService.update()
    .setSql("stock = stock -1") // set stock = stock - 1
    .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0 添加了乐观锁
    .update();
    if (!success) {
    log.error("库存不足");
    return;
    }
    // 2.2创建订单
    save(voucherOrder);
    } catch (Exception e) {
    log.error("处理订单异常", e);
    }
    }
    }
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
    // 1.执行lua脚本
    Long userId = UserHolder.getUser().getId();
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
    Collections.emptyList(),
    voucherId.toString(), userId.toString());
    // 2.判断结果是否为0
    // 3.不为0则返回错误信息
    if (!result.equals(0L)) {
    return Result.fail(result.equals(1L) ? "库存不足" : "您已经购买过该特价券,每人仅限一张");
    }
    // 4.为0则将下单信息保存到阻塞队列中
    VoucherOrder order = new VoucherOrder();
    long orderId = redisIdWorker.nextId("order");
    order.setId(orderId);
    order.setUserId(userId);
    order.setVoucherId(voucherId);
    orderTasks.add(order);
    // 5.返回订单id
    return Result.ok(orderId);
    }
    }

2.3.8 Redis消息队列实现异步秒杀

  • 上一节我们基于 jvm 的阻塞队列进行秒杀优化存在 2 个问题:

    1. jvm 的内存限制问题。
    2. 数据安全问题:jvm 的内存数据没有持久化,每当服务器重启或者宕机或者从阻塞队列取的时候遇到异常,数据都会丢失。

    解决办法: 消息队列。

  • 消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括 3 个角色:

    • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
    • 生产者:发送消息到消息队列
    • 消费者:从消息队列获取消息并处理消息
  • 视频中后面讲的是 Redis 自己提供的消息队列,而不是用市面上主流的消息队列,所以就不记笔记了。

2.4 关注推送(Feed流)

  • 关注推送也叫做 Feed 流,直译为投喂。为用户持续的提供 “沉浸式” 的体验,通过无限下拉刷新获取新的信息。

    • Feed 流产品有两种常见模式:

      • Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注,例如朋友圈。
      • 优点:信息全面,不会有缺失。并且实现也相对简单
        • 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
      • 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容,推送用户感兴趣信息来吸引用户。
        • 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
        • 缺点:如果算法不精准,可能起到反作用

      本例中的个人页面,是基于关注的好友来做 Feed 流,因此采用 Timeline 的模式。该模式的实现方案有三种:

      • 拉模式(读扩散)
    • 推模式(写扩散)

      • 推拉结合(读写混合)

      | | 拉模式 | 推模式 | 推拉结合 |

    | :———-: | :——: | :—————: | :——————-: |
    | 写比例 | 低 | 高 | 中 |
    | 读比例 | 高 | 低 | 中 |
    | 用户读取延迟 | 高 | 低 | 低 |
    | 实现难度 | 复杂 | 简单 | 很复杂 |
    | 使用场景 | 很少使用 | 用户量少,没有大V | 过千万的用户量,有大V |

    • 这三种模式的具体动画表示可以看视频 P84
  • 本例采用实现简单的推模式,通过将用户发布的文章推送到粉丝的收件箱来实现关注推送。

2.4.1 基于推模式实现关注推送

  • 需求:

    • 修改新增探店笔记的业务,在保存 blog 到数据库的同时,推送到粉丝的收件箱

    • 收件箱满足可以根据时间戳排序,必须用 Redis 的数据结构实现

      • 根据目前所学,List 或者 SortedSet 类型的数据结构都能满足要求。
    • 查询收件箱数据时,可以实现分页查询

      • Feed 流的分页问题:Feed 流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。

        需要使用滚动分页,通过记录上一次读取的页数的 lastId 来避免重复读取的情况。

        • 因为要实现分页查询的滚动分页,所以 List 类型的数据结构就不能满足要求了,因为它只能根据角标来分页,而在 Feed 流中角标是会变化的。由于 SortedSet 类型的数据结构有 ZRANGEBYSCORE key min max 命令,所以就可以满足我们的要求。
  • 修改新增探店笔记的业务,在保存 blog 到数据库的同时,推送到粉丝的收件箱:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Override
    public Result saveBlog(Blog blog) {
    // 1.获取登录用户
    UserDTO user = UserHolder.getUser();
    blog.setUserId(user.getId());
    // 2.保存探店笔记
    boolean isSuccess = save(blog);
    if (!isSuccess) {
    return Result.fail("保存笔记失败");
    }
    // 3.查找所有关注了作者的粉丝
    List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
    // 4.将笔记的id推送到粉丝的收件箱
    for (Follow follow : follows) {
    Long userId = follow.getUserId(); // 粉丝id
    stringRedisTemplate.opsForZSet().add(RedisConstants.FEED_KEY + userId,
    blog.getId().toString(),
    System.currentTimeMillis());
    }
    // 5.返回笔记id
    return Result.ok(blog.getId());
    }
滚动分页
  • 查询收件箱数据时,可以实现分页查询:

    • 滚动分页查询主要使用 SortedSetZRANGEBYSCORE 方法,因为以时间戳作为 score,越新的笔记排名理应越靠前,所以要降序排序,所以应该使用 ZREVRANGEBYSCORE 方法,主要涉及四个参数:

      ZREVRANGEBYSCORE key max min [WITHSCORES][LIMIT offset count]

      • max:指定范围对应的最大 score

        • 第一次查询给当前时间戳,往后查询给 “上一次查询的 min
      • min:指定范围对应的最小 score

        • 因为 score 为时间戳,所以恒为正数,那么把 min 固定为 0 即可
      • offset:读取偏移量,为 0 表示从 max 所在的数据开始向下读 count 条数据

        • 第一次查询给 0,往后查询给 “上一次查询的 min 对应的数据个数”,避免好几条数据因为 score 值相同而出现问题,例如下面这样,m6 被重复查询了。
      • count:读取数据的条数

        • 由前端传值,来决定一页显示几条数据
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    package com.f.dto;

    import lombok.Data;

    import java.util.List;

    @Data
    public class ScrollResult {
    private List<?> list;
    private Long minTime; // 查询到的min,作为下一次查询的max
    private Integer offset; // 读取偏移量
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    @Override
    public Result queryBlogOfFollow(Long max, Integer offset) {
    // 1.获取当前用户
    Long userId = UserHolder.getUser().getId();
    // 2.根据当前用户找到对应的收件箱,并根据max和offset找到收件箱对应的片段
    String key = RedisConstants.FEED_KEY + userId;
    Set<ZSetOperations.TypedTuple<String>> tuples = stringRedisTemplate.opsForZSet()
    .reverseRangeByScoreWithScores(key, 0, max, offset, 3);
    if (tuples == null || tuples.isEmpty()) { // 非空判断
    return Result.ok();
    }
    // 3.解析数据:blogId、minTime、offset
    List<String> blogIds = new ArrayList<>();
    long minTime = 0;
    int count = 1;
    for (ZSetOperations.TypedTuple<String> tuple : tuples) {
    // 获取blogId
    String blogId = tuple.getValue();
    blogIds.add(blogId);
    // 获取时间戳和offset
    long time = tuple.getScore().longValue();
    if (minTime == time) {
    count++;
    } else {
    minTime = time;
    count = 1;
    }
    }
    // 4.根据blogId查询对应的博客并封装
    List<Blog> blogs = new ArrayList<>();
    for (String blogId : blogIds) {
    Blog blog = query().eq("id", blogId).one();
    queryBlogUser(blog);
    blogs.add(blog);
    }
    // 5.返回结果
    ScrollResult scrollResult = new ScrollResult();
    scrollResult.setList(blogs);
    scrollResult.setMinTime(minTime);
    scrollResult.setOffset(count);
    return Result.ok(scrollResult);
    }

2.5 UV统计

  • UV:全称 Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1 天内同一个用户多次访问该网站,只记录 1 次。

    • 相应的还有一个 PV 概念:全称 Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录 1 次 PV,用户多次打开页面,则记录多次 PV,PV 往往用来衡量网站的流量。

    UV 统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存,但是如果每个访问的用户都保存到 Redis 中,数据量会非常恐怖。所以我们采用 HyperLogLog 来进行统计。

2.5.1 HyperLogLog

  • Hyperloglog(HLL)是从 Loglog 算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。

    • Redis 中的 HLL 是基于 String 结构实现的,单个 HLL 的内存永远小于 16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于 0.81% 的误差,不过对于 UV 统计来说,这完全可以忽略。

      下面是 Redis 中 HyperLogLog 的相关命令:

2.5.2 使用HyperLogLog实现UV统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
void testHyperLogLog() {
String[] ids = new String[1000];
int j;
for (int i = 0; i < 1000000; i++) {
j = i % 1000;
ids[j] = "user_" + (i);
if (j == 999) { // 每一千条数据推一次到Redis
stringRedisTemplate.opsForHyperLogLog().add("hll", ids);
}
}
Long size = stringRedisTemplate.opsForHyperLogLog().size("hll"); // 统计
System.out.println(size);
}
---------------The End---------------