之前写过一个日志回放引擎的第一代千万级日志回放引擎设计稿,当时理解的日志回放就是把日志记录的请求重新发出去,这就是回放线上用户的流量了。可是在我最近看goreplay的过程中,重新刷新了我的认知。 查阅了一些资料,终于算是了解了一些基于时间戳的方案和思路。大体如下:通过工具把线上某段时间的流量记录下来,其中包含时间戳等信息,然后通过回放引擎把流量回放出去。 目前流量回放集中于HTTP流量,所以之前写过的引擎的发压部分还是可以继续使用。所以我也有了自己的解决思路: 其中最最核心的应该就是队列的选择,这里我用看java的java.util.concurrent.DelayQueue,也没找到其他更好的框架了。其实在一开始我想复用自己写之前写的日志回放框架的队列,也尝试对集中常用队列进行了性能测试: 本来想是用多线程去读取日志的过程中,通过判断每一条日志是否到时间点,然后丢到一个线程安全的队列中,后面用线程池取队列中的对象,发送请求的。但是仔细想来太复杂了,流量过了好几手,不利于实现和拓展功能。 然后我重新对java.util.concurrent.DelayQueue进行了性能测试延迟队列DelayQueue性能测试,有了测试结果之后,就可以放心大胆地干了。关于延迟队列的基本使用可参考下单延迟10s撤单性能测试。 总体来说实现起来思路比较清晰,我分成三部分分享。 代码如下: 下面是我的测试用例: 测试结果如下:解决思路
实现
属性定义
生产者
消费者
package com.funtester.frame.execute
import com.funtester.base.bean.AbstractBean
import com.funtester.frame.SourceCode
import com.funtester.utils.LogUtil
import com.funtester.utils.RWUtil
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import java.util.concurrent.DelayQueue
import java.util.concurrent.Delayed
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.LongAdder
/**
* 回放功能执行类*/
class ReplayConcurrent extends SourceCode {
private static Logger logger = LogManager.getLogger(ReplayConcurrent.class);
static ThreadPoolExecutor executor
static boolean key = true
static int MAX_LENGTH = 800000
int threadNum = 2
String name
String fileName
int multiple
Closure handle
List<ReplayLog> logs
DelayQueue<ReplayLog> logDelayQueue = new DelayQueue<ReplayLog>()
LongAdder total = new LongAdder()
ReplayConcurrent(String name, String fileName, int multiple, Closure handle) {
this.name = name
this.fileName = fileName
this.multiple = multiple
this.handle = handle
}
void start() {
if (executor == null) executor = ThreadPoolUtil.createCachePool(THREADPOOL_MAX, "R")
time({
RWUtil.readFile(fileName, {
def delay = new ReplayLog(it)
if (delay.getTimestamp() != 0) logDelayQueue.add(delay)
})
}, 1, "读取日志$fileName")
logs = logDelayQueue.toList()
def timestamp = logs.get(0).getTimestamp()
logDelayQueue.clear()
AtomicInteger index = new AtomicInteger()
AtomicInteger size = new AtomicInteger()
def LogSize = logs.size()
AtomicInteger diff = new AtomicInteger()
threadNum.times {
fun {
while (key) {
if (index.get() % LogSize == 0) diff.set(getMark() - timestamp)
if (index.get() % MAX_LENGTH == 0) size.set(logDelayQueue.size())
if (size.get() > MAX_LENGTH) {
sleep(1.0)
size.set(logDelayQueue.size())
}
def replay = logs.get(index.getAndIncrement() % LogSize)
logDelayQueue.add(replay.clone(replay.timestamp + diff.get()))
}
}
}
threadNum.times {
fun {
while (key) {
def poll = logDelayQueue.poll(1, TimeUnit.SECONDS)
if (poll != null) {
executor.execute {
multiple.times {
handle(poll.getUrl())
total.add(1)
}
}
}
}
}
}
fun {
while (key) {
sleep(COUNT_INTERVAL as double)
int real = total.sumThenReset() / COUNT_INTERVAL as int
def active = executor.getActiveCount()
def count = active == 0 ? 1 : active
logger.info("{} ,实际QPS:{} 活跃线程数:{} 单线程效率:{}", name, real, active, real / count as int)
}
}
}
/**
* 中止
* @return
*/
def stop() {
key = false
executor.shutdown()
logger.info("replay压测关闭了!")
}
/**
* 日志对象*/
static class ReplayLog extends AbstractBean implements Delayed {
int timestamp
String url
ReplayLog(String logLine) {
def log = LogUtil.getLog(logLine)
this.url = log.getUrl()
this.timestamp = log.getTime()
}
ReplayLog(int timestamp, String url) {
this.timestamp = timestamp
this.url = url
}
@Override
long getDelay(TimeUnit unit) {
return this.timestamp - getMark()
}
@Override
int compareTo(Delayed o) {
return this.timestamp - o.timestamp
}
protected Object clone(int timestamp) {
return new ReplayLog(timestamp, this.url)
}
}
}
自测
package com.okcoin.hickwall.presses
import com.okcoin.hickwall.presses.funtester.frame.execute.ReplayConcurrent
import com.okcoin.hickwall.presses.funtester.httpclient.FunHttp
class RplayT extends FunHttp {
static String HOST = "http://localhost:12345"
public static void main(String[] args) {
def fileName = "api.log"
new ReplayConcurrent("测试回放功能", fileName, 1, {String url ->
getHttpResponse(getHttpGet(HOST + url))
}).start()
}
}
22:45:43.510 main
###### # # # # ####### ###### ##### ####### ###### #####
# # # ## # # # # # # # #
#### # # # # # # #### ##### # #### #####
# # # # # # # # # # # # #
# ##### # # # ###### ##### # ###### # #
10:56:18 F-5 测试回放功能, 实际QPS:23162 活跃线程数:0单线程效率:23162
10:56:23 F-5 测试回放功能, 实际QPS:36575 活跃线程数:6单线程效率:6095
10:56:28 F-5 测试回放功能, 实际QPS:38974 活跃线程数:21单线程效率:1855
10:56:33 F-5 测试回放功能, 实际QPS:32798 活跃线程数:8单线程效率:4099
10:56:38 F-5 测试回放功能,实际QPS:35224 活跃线程数:4单线程效率:8806
10:56:43 F-5 测试回放功能,实际QPS:28426 活跃线程数:0单线程效率:28426
10:56:48 F-5 测试回放功能, 实际QPS:33607 活跃线程数:6单线程效率:5601
10:56:53 F-5 测试回放功能,实际QPS:34392 活跃线程数:0单线程效率:34392