异常场景
最近一段时间时不时就有开发人员向我反应:redis的key有点问题,帮我删个key值、怎么key没有过期,我明明设了过期时间的。一开始没有放心上,以为只是程序逻辑处理不当或者redis偶尔抽风,不用在意。可是渐渐反应的人多了,觉得可能不是这么简单了,于是就和相关的开发人员讨论了下,发现会出现异常的基本是以下两种场景:
- 使用redis 的 incrBy 命令来防止重复提交,大致的redis交互如下:
incrBy key 1 # 返回 1 表示正常 返回 > 1 表示重复提交 直接 return
expire key 5 # 5秒后过期
del key # 正常业务逻辑走完后 显示删除key
- 使用 redis (set setnx setex ) + expire 组合命令:
set |setnx | setex key
expire key seconds # 设置失败
由于系统中大量使用了第一种方式来防止重复提交,所以这个问题对业务影响还是很大的,redis具体现象为:
- 设置incrby 第一次应该返回 1
- 实际上却返回了 2 ,导致业务流程无法走完,并且key没有删除也没有设置过期时间
查找问题
编写测试程序
为了排查问题写了一个简单的小程序:
@Test
public void test() throws InterruptedException {
final List<Long> list = Lists.newArrayListWithCapacity(5000000);
Long first_id = 500000000000000001L;
for (int i = 0; i < 5000000; i++) {
list.add(first_id++);
}
final CountDownLatch allDone = new CountDownLatch(2);
// incrby
new Thread(
new Runnable() {
public void run() {
try {
testIncrby(list);
} catch (InterruptedException e) {
}
allDone.countDown();
}
}
).start();
new Thread(
new Runnable() {
public void run() {
try {
testIncrByAndDel(list);
} catch (InterruptedException e) {
}
allDone.countDown();
}
}
).start();
allDone.await();
log.info("finish method test");
}
public void testDelAndTTL() throws InterruptedException {
final List<Long> list = Lists.newArrayListWithCapacity(5000000);
Long first_id = 500000000000000001L;
for (int i = 0; i < 5000000; i++) {
list.add(first_id++);
}
final CountDownLatch allDone = new CountDownLatch(2);
new Thread(
new Runnable() {
public void run() {
testDelAndTTL(list, "walletMember");
allDone.countDown();
log.info("finish walletMember keys");
}
}
).start();
new Thread(
new Runnable() {
public void run() {
testDelAndTTL(list, "walletMember1");
allDone.countDown();
log.info("finish walletMember1 keys");
}
}
).start();
allDone.await();
log.info("finish all keys");
}
private void testIncrby(final List<Long> list) throws InterruptedException {
long times = 0L;
for(;;){
times ++;
for (Long key : list) {
Optional<Long> value = redisOperator.incrBy("walletMember" + key, 1);
if (value.isPresent()) {
if (value.get() != 1L) {
log.error("there is something wrong in {}: the incrBy method return wrong value, expact : {} , result : {}, key : {}", "testIncrby", 1, value.get(), "walletMember" + key);
continue;
}
} else {
log.error("there is something wrong in {}: the incrBy method return wrong value, expact : {} , result : {}, key : {}", "testIncrby", 1, null, "walletMember" + key);
}
Optional<Boolean> isSuccess = redisOperator.expire("walletMember" + key, 30);
if (!isSuccess.isPresent() || !isSuccess.get()) {
log.error("there is something wrong in {}: the expire method return wrong value, expact : {} , result : {}, key : {}", "testIncrby", "true", (isSuccess.isPresent() ? isSuccess.get() : null), "walletMember" + key);
}
}
log.info("finish {} method {} times. sleep 35 seconds", "testIncrby", times);
Thread.sleep(60*5*1000L);
if(times > 1000000) {
return ;
}
}
}
private void testIncrByAndDel(final List<Long> list) throws InterruptedException {
long times = 0L;
for(;;) {
times ++;
for (Long key : list) {
Optional<Long> value = redisOperator.incrBy("walletMember1"+key, 1);
if(value.isPresent()) {
if(value.get() != 1L) {
log.error("there is something wrong in {} method: the incrBy method return wrong value, expact : {} , result : {}, key : {}", "testIncrByAndDel", 1, value.get(), "walletMember1" + key);
continue;
}
} else {
log.error("there is something wrong in {} method: the incrBy method return wrong value, expact : {} , result : {}, key : {}", "testIncrByAndDel", 1, null, "walletMember1" + key);
}
Optional<Boolean> isSuccess= redisOperator.expire("walletMember"+key, 60);
if(!isSuccess.isPresent() || !isSuccess.get()) {
log.error("there is something wrong in {}: the expire method return wrong value, expact : {} , result : {}, key : {}", "testIncrByAndDel", "true", (isSuccess.isPresent() ? isSuccess.get() : null), "walletMember1" + key);
}
redisOperator.del("walletMember1"+key);
}
log.info("finish {} method {} times. sleep 35 seconds", "testIncrByAndDel", times);
Thread.sleep(60*5*1000L);
if(times > 1000000) {
return ;
}
}
}
private void testDelAndTTL(final List<Long> list, String prefix) {
for (Long key : list) {
Optional<Integer> value = redisOperator.get(prefix + key, Integer.class);
if(value.isPresent()) {
Long ttl = redisOperator.getFastJsonRedisTemplate().getExpire(prefix + key);
log.error("there is something wrong : key : {}, value : {}, ttl : {}", prefix + key, value.get(), ttl);
}
}
}
可能是测试用例写的不好,线上线下都没有发现明显的问题,于是对代码做了调整:
@Autowired
private StringRedisTemplate stringRedisTemplate;
private static List<String> keys = Lists.newArrayListWithCapacity(100);
(value = "/test/redis", method = RequestMethod.POST)
public String createGoogleAuth() throws InterruptedException {
final int numThreads = 100;
final ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
log.info("--------------start------------------");
for(int i = 0; i < 100; i++) {
threadPool.submit(new Runnable() {
public void run() {
redis_incrby();
}
});
}
return "OK";
}
private void redis_incrby() {
final int loops = 2500;
String key = UUID.randomUUID().toString();
keys.add(key);
for(int j=0;j<loops;j++) {
log.info("key : {}, return : {}", key, stringRedisTemplate.opsForValue().increment(key, 1));
log.info("key : {}, expire : {}", key, stringRedisTemplate.expire(key, 5, TimeUnit.SECONDS));
log.info("key : {}, return : {}", key, stringRedisTemplate.opsForValue().increment(key, 1));
log.info("key : {}, expire : {}", key, stringRedisTemplate.expire(key, 5, TimeUnit.SECONDS));
log.info("key : {}, return : {}", key, stringRedisTemplate.opsForValue().increment(key, 1));
log.info("key : {}, expire : {}", key, stringRedisTemplate.expire(key, 5, TimeUnit.SECONDS));
log.info("key : {}, return : {}", key, stringRedisTemplate.opsForValue().increment(key, 1));
log.info("key : {}, expire : {}", key, stringRedisTemplate.expire(key, 5, TimeUnit.SECONDS));
}
stringRedisTemplate.delete(key);
log.info("key : {}, finish.", key);
}
执行几次后终于发现了问题:按照代码逻辑,每个key最后的value应该都为10000,然后有的key的结果大于10000,于是想到两种情况:
- 1)客户端多发送了一次命令
- 2)redis 执行错误
验证原因
为了验证到底是哪种原因,采取了以下方式:
- 修改Jedis 源码:修改BinaryClient的incrBy方法,打印日志信息:
public void incrBy(final byte[] key, final long integer) {
log.error("key: {}, delta : {}", new String(key), integer);
sendCommand(INCRBY, key, toByteArray(integer));
}
- 对redis进行监听,使用watch命令:
./redis-cli -c -h ip -p port watch
- 通过tcpdump 对所有流量记录进行保存
tcpdump -i eth0 -s 0 -w tcp.cap # 保存为.cap文件可以通过wireshark 进行分析
分析原因
通过重新执行上面的测试代码,对日志文件进行筛选:
grep 10001 nohup.out
从中选取一个key:3e9dd73a-fee2-416c-b3c8-5369b2cd4ec8
然后继续筛选:
grep "key : 3e9dd73a-fee2-416c-b3c8-5369b2cd4ec8 return : " nohup.out
仔细查看后发现:
直接从 2698 跳跃到了 2700
然后再筛选日志,发现以下内容:
连续执行了两次incrBy命令,然后再去查找同一时刻的redis watch记录,发现的确收到了两条INCRBY 3e9dd73a-fee2-416c-b3c8-5369b2cd4ec8 1命令,那接下来就要搞清楚为什么会发起两次请求,于是轮到tcpdump了。
wireshark 分析请求流量
将tcpdump的文件导入wireshark后,根据筛选条件进行筛选,最终找到了对应的tcp请求(Frame 698961):
其中这行引起了我注意:[Expert Info (Note/Sequence): This frame is a (suspected) retransmission]
查询资料后发现,这种情况是出现在以下情形:
? 当tcp请求允许重试的前提下发现当前tcp请求过了timeout时间还没收到返回值,那么就发起一次重试,而重试的tcp请求在wireshark中就会被标记为[Expert Info (Note/Sequence): This frame is a (suspected) retransmission]。
于是原因很明显了,由于Jedis设置的超时时间过短,导致jedis发起了重试,最终导致了上述redis的异常情况。
解决方案
Jedis
查询jedis配置文件,发现超时时间为500毫秒,
显然太短了,遂改为15秒,改完以下重新执行了几次测试程序,都没有再发现问题,然后修改线上代码。
Redis
导致上述jedis请求超时的原因也有redis cluster的原因。查询发现线上集群存在着key分布不均匀的情况:
可以看出其中一个节点的key数量是其他节点的1.5倍,并且分析这些key发现他们访问的频率也很高,在一定程度上导致了jedis的超时,因此接下来要对redis集群进行优化。