Skip to content

Commit

Permalink
Merge pull request #515 from jifengzhilong/master
Browse files Browse the repository at this point in the history
trans
  • Loading branch information
sadadw1 authored Jul 4, 2023
2 parents 5d81b55 + a9ab9d6 commit bd4c3c8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ public class ConsumerService {
private StringBuilder secondBatchRocksMessage = new StringBuilder();

/**
* 控制rocksDB每批存入的消息数量,防止单个key消息过多导致的内存溢出
* Control the number of rocksDB messages stored in each batch
* to prevent memory overflow caused by too many single key messages
*/
private int firstCount = 0;
private int secondCount = 0;
private static final int BATCH_ROCKSDB_COUNT = 20;
/**
* first与second锁隔离
* The first lock is isolated from the second lock
*/
private static final Object FIRST_LOCK = new Object();
private static final Object SECOND_LOCK = new Object();
Expand All @@ -79,9 +80,9 @@ public void init() {
if (filterIsOpen) {
firstRocksdbStoreService = new RocksdbStoreServiceImpl(firstRocksPath, TeSnowFlake.FIRST_TIMESTAMP_REDIS_PREFIX);
secondRocksdbStoreService = new RocksdbStoreServiceImpl(secondRocksPath, TeSnowFlake.SECOND_TIMESTAMP_REDIS_PREFIX);
// 初始化第一次读取rocksdb任务
// Initialize the rocksdb task for the first time
initFirstRocksTask();
// 初始化第二次读取rocksdb任务
// Initializes the second read rocksdb task
initSecondRocksTask();
}
}
Expand All @@ -90,15 +91,14 @@ private void dealMessage(String order, String message) {
if (StringUtils.isEmpty(message)) {
return;
}
// 存入Rocksdb的消息体为 traceId ### serviceName ### spanName ### TSpanData(String) #### ......
// 过滤
// The body of the message stored in Rocksdb is: traceId ### serviceName ### spanName ### TSpanData(String) #### ......
String[] messages = message.split(MessageUtil.ROCKS_SPLIT);
for (String oneMessage : messages) {
String[] split = oneMessage.split(MessageUtil.SPLIT);
TSpanData tSpanData = deserializeFromString(split[3]);
if (tSpanData != null) {
if (traceIdRedisBloomUtil.isExistLocal(split[0])) {
// 写入es
// write into es
writeEsService.insertJaegerSpan(tSpanData, split[1], split[2]);
} else if (RocksdbStoreServiceImpl.FIRST_ORDER.equals(order)) {
insertRocks(split[0], split[1], split[2], tSpanData, RocksdbStoreServiceImpl.SECOND_ORDER);
Expand Down Expand Up @@ -133,7 +133,7 @@ public void consumer(TSpanData tSpanData) {
if (tSpanData.getExtra() != null && StringUtils.isNotEmpty(tSpanData.getExtra().getServiceName())) {
serviceName = tSpanData.getExtra().getServiceName();
}
// 过滤
// filter
String traceId = tSpanData.getTraceId();
String spanName = tSpanData.getName();
Long duration = tSpanData.getEndEpochNanos() - tSpanData.getStartEpochNanos();
Expand All @@ -143,10 +143,10 @@ public void consumer(TSpanData tSpanData) {
}
if (filter.isResult()) {
if (filter.isAddBloom()) {
// 插入bloomfilter
// inert bloomfilter
traceIdRedisBloomUtil.addBatch(traceId);
}
// 写入es
// write into es
writeEsService.insertJaegerSpan(tSpanData, serviceName, spanName);
} else {
insertRocks(traceId, serviceName, spanName, tSpanData, RocksdbStoreServiceImpl.FIRST_ORDER);
Expand All @@ -173,7 +173,7 @@ private void insertRocks(String traceId, String serviceName, String spanName, TS

private void internatInset(String traceId, String serviceName, String spanName, TSpanData tSpanData, String order) {
buildRocksDBMessage(traceId, serviceName, spanName, tSpanData, order);
// 判断秒级匹配
// Check the second level match
long currSeconds = System.currentTimeMillis() / 1000;
if (RocksdbStoreServiceImpl.FIRST_ORDER.equals(order)) {
if (LocalStorages.firstCurrentSeconds != currSeconds || firstCount >= BATCH_ROCKSDB_COUNT) {
Expand Down Expand Up @@ -214,11 +214,11 @@ private void buildRocksDBMessage(String traceId, String serviceName, String span
}

private void initFirstRocksTask() {
// 获取上一次消息读取的时间戳
// Gets the timestamp of the last message read
String firstKey = snowFlake.recoverLastTimestamp(TeSnowFlake.FIRST_TIMESTAMP_REDIS_PREFIX);
final String firstLastRocksKey = firstKey == null ?
System.currentTimeMillis() + "_" + LocalStorages.firstRocksKeySuffix.get() : firstKey;
// 第一次读取本地消息线程
// The local message thread is read for the first time
ExecutorUtil.submitRocksDBRead(() -> {
try {
firstRocksdbStoreService.delayTake(firstLastRocksKey, firstGap, new Consumer<byte[]>() {
Expand All @@ -241,11 +241,11 @@ public void accept(byte[] bytes) {
}

private void initSecondRocksTask() {
// 获取上一次消息读取的时间戳
// Gets the timestamp of the last message read
String secondKey = snowFlake.recoverLastTimestamp(TeSnowFlake.SECOND_TIMESTAMP_REDIS_PREFIX);
final String secondLastRocksKey = secondKey == null ?
System.currentTimeMillis() + "_" + LocalStorages.secondRocksKeySuffix.get() : secondKey;
// 第二次读取本地消息线程
// The local message thread is read for the sencond time
ExecutorUtil.submitRocksDBRead(() -> {
try {
secondRocksdbStoreService.delayTake(secondLastRocksKey, secondGap, new Consumer<byte[]>() {
Expand Down Expand Up @@ -281,7 +281,7 @@ private String serializeToString(TSpanData tSpanData) {
private TSpanData deserializeFromString(String decode) {
try {
TSpanData tSpanData = new TSpanData();
// 使用ISO-8859-1编码方式防止byte[]转String时,字符集额外处理造成byte[]不一致,从而导致thrift反序列化字段缺失
// The ISO-8859-1 encoding prevents byte[] inconsistency caused by extra character set processing when byte[] is converted to String, resulting in missing thrift deserialization fields
new TDeserializer(ThriftUtil.PROTOCOL_FACTORY).deserialize(tSpanData, decode.getBytes(StandardCharsets.ISO_8859_1));
return tSpanData;
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public String toJaegerTrace(String message) {
}

public byte[] toTSpanDateBytes(String message) {
// 将nginx日志转换为NginxJaegerDomain, 这由具体实现类实现,可以每种nginx日志对应不同实现
// The nginx logs are converted to NginxJaegerDomain, which is implemented by the concrete implementation class and can be implemented differently for each nginx log
NginxJaegerDomain parse = parse(message);
if (parse != null) {
try {
Expand Down Expand Up @@ -203,7 +203,7 @@ protected String generateSpanId() {
}

/**
* 去掉请求参数
* delete request params
* @param requestUri
* @return
*/
Expand Down

0 comments on commit bd4c3c8

Please sign in to comment.