redis项目-分布式锁
秒杀业务
秒杀下单应该思考的内容:
下单时需要判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
下单核心逻辑分析:
当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件
比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。
最基本的逻辑,首先判断时间和库存,再进行库存扣减,如果成功扣减库存就下订单
1 |
|
超卖问题
多线程并发,这样的逻辑当出现最后一张的时候多个线程涌入就会变成负数,无需多言
一人一单
几个阶段:
- 最简单逻辑,只要加上一个对于订单的查询就可,查询条件是当前的用户id和秒杀券id
- 但是发现这样对于两个同用户的线程还是会造成不是一人一单的情况,这个时候我们需要悲观锁
1
2
3
4
public synchronized Result createVoucherOrder(Long voucherId) {
...
} - 首先是加在方法上,这样粒度有点大,锁的是整个方法,我们考虑细化锁,用代码块,那么这个锁的参数是什么呢
1
2
3
4
5
6
7
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()){
...
}
} - 对于一个用户的id肯定是可以互相区分的,我们就锁这个id,但是id的tostring方法是一个new的过程,也就是说我们用tostring都是一个新的,这个时候需要用jvm的知识了,直接在字符串常量池里面找到这个对象,这样就确保了是唯一的一个对象。
1
2
3
4
5
6
7synchronized (user.getId().toString().intern()){
//代理对象是spring在初始化的时候包装在我们类上的另一个类,他会在代理对象中通过trycatch实现事务
//那么如果我们直接使用了proxy这个类,就可以得到事务的支持
//通过aop实现,但是要在启动类上暴露aop
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} - 但是事务是在锁的外面的,也就是说有可能锁释放了但是还没有写入数据库,这样还是有隐患,所以我们需要将锁的范围比事务大,而事务是写在函数上的,那么就需要在调用这个函数的地方上锁,而不是在这个函数里面上锁。
- 最后需要让事务生效,要用到代理对象,这样就有了一个完整的方案。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
private ISeckillVoucherService seckillVoucherService;
private RedisIdWorker redisIdWorker;
private StringRedisTemplate stringRedisTemplate;
public Result seckillVoucher(Long voucherId) {
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀尚未开始");
}
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
if (voucher.getStock() < 1){
return Result.fail("库存不足");
}
//一人一单,是同一个用户的并发安全问题,所以加锁的是userId
UserDTO user = UserHolder.getUser();
//分布式锁
SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate,"order:"+user.getId());
boolean isLock = lock.tryLock(1200);
//这里如果没有获取到锁,说明有一个相同用户id的人已经在下单了,所以直接返回不允许重复
if (!isLock){
return Result.fail("不允许重复下单");
}
/*synchronized (user.getId().toString().intern()){
//代理对象是spring在初始化的时候包装在我们类上的另一个类,他会在代理对象中通过trycatch实现事务
//那么如果我们直接使用了proxy这个类,就可以得到事务的支持
//通过aop实现,但是要在启动类上暴露aop
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}*/
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
finally {
lock.unlock();
}
}
/**
* 事务的作用域要小于锁,否则会出现锁释放了而事务没结束,有安全隐患
* @param voucherId
* @return
*/
public Result createVoucherOrder(Long voucherId){
//一人一单
UserDTO user = UserHolder.getUser();
Integer count = lambdaQuery()
.eq(VoucherOrder::getUserId,user.getId())
.eq(VoucherOrder::getVoucherId,voucherId)
.count();
if (count>0){
return Result.fail("用户已经购买过了");
}
//更新库存
LambdaUpdateWrapper<SeckillVoucher> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.setSql("stock = stock-1")
.eq(SeckillVoucher::getVoucherId,voucherId)
.ge(SeckillVoucher::getStock,0);
boolean success = seckillVoucherService.update(lambdaUpdateWrapper);
if (!success){
return Result.fail("库存不足");
}
//创建订单
long orderId = redisIdWorker.nextId("order");
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(user.getId());
save(voucherOrder);
return Result.ok(orderId);
}
}
分布式锁
几个阶段:
- 分布式锁提出是为了解决锁不可见的问题,有了一个全局的锁就可以进行跨服务器的上锁
- 主要的实现操作为setnx,也就是set if not exist为什么这个操作能当锁呢?当第一个线程setnx了,那么value就会是自己的那个,并且返回为真。另一个线程到了上锁的这一段,也setnx,此时由于他要的key已经有了另一个值,不满足not exist,所以就返回否;这个的主要原因还是在于setnx是一个原子命令,一次执行一行就能满足并发的需求
- 第一阶段:我们就用setnx来完成,上锁过程直接将线程号作为值,解锁根据锁名称删除
- 第二阶段:可能出现这样一种情况,线程拿到锁以后卡了,过了释放时间释放给另一个线程,此时线程1缓过来了,执行到解锁过程就把别人的给解锁了。解决方法很简单,只要判断现在这个锁里面的值是不是自己的就可以
- 第三阶段:如果线程1在判断完了之后卡了,判断的结果确实是自己的,但是刚好超时了,这个时候又被别人抢走了,最后释放的还是别人的锁,这是因为锁不是原子性的,检查是否一致和释放要变成原子操作。需要用到lua脚本
- 第四阶段:用到了lua脚本完成了判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
private String name;
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
public boolean tryLock(long timeoutsec) {
//这里加uuid是因为在不同机器上的线程标识可能一致
String id = ID_PREFIX+Thread.currentThread().getId();
Boolean b = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX+name, id , timeoutsec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(b);
}
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
public void unlock() {
//lua
stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX+name),ID_PREFIX+Thread.currentThread().getId());
}
/**
* 这是非原子操作的解锁
*/
public void unlockNotSafe(){
//要判断是不是当前线程才能删除,否则会误删
String redisValue = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
String id = ID_PREFIX+Thread.currentThread().getId();
if (id.equals(redisValue)){
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
}
手写可重入分布式锁
与reentrantLock相似,实现可重入的过程就是一个hash数据结构,小key为线程id,大key为锁名字,值为进入的次数。可重入指的是一个线程可以多次获得这个锁,每次线程进入,value都要+1,同理,出去了就要-1,要保证最后退出的时候为0.
这里使用lua脚本完成,第一次手写遇到了很多困难,主要原因是一个空指针异常,估计是因为我用的模板是stringRedisTemplate,有一个long类型的数据一直报错。
主要思路是,首先看redis中是否有这个锁,如果没有就直接上锁。如果有的话进一步检查线程号是否是自己的,如果是自己的就+1,不是的话就直接退出。
1 | ---key[1]为大key,argv[1]为小key,argv[2]为时间 |
主要思路是判断是否是自己的,如果不是自己的直接退出,如果是自己的在判断是否减到0了,如果减少到0直接删除这个hash
1 | --- 判断是否是自己的,然后在判断是否减掉之后为0,为0则删除 |
改进后的代码如下所示:
1 | //可重入的分布式上锁 |
看门狗机制
开启一个线程监视ttl,如果到了某个阈值程序还没结束就续期
1 | static class WatchDogThread extends Thread { |
剩下的内容我觉得他课程没啥用,自己用消息队列就解决了,没啥技术含量就不写,主要是解耦的操作。
改造后就变成了前面校验完全用lua脚本,保证原子性的同时也不用上锁了,因为redis是单线程。有订单的就mq,kafka处理。
课程也有可取之处,BlockingQueue是我第一次见,这就是一个简易版本的消息队列,在队列为空的时候会阻塞消费者进程,满的时候会阻塞生产者进程。