Skip to content

Commit a07ba5c

Browse files
committed
在kafka插件中检查出心跳机制的问题,调整为发送心跳包后服务端回复一个包,当客户端长期未收到包则自动重启
1 parent efe61c6 commit a07ba5c

File tree

9 files changed

+160
-27
lines changed

9 files changed

+160
-27
lines changed

addons-kafka/src/main/java/org/wowtools/hppt/addons/kafka/KafkaClientSessionService.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,22 @@
77

88
/**
99
* 客户端,部署在电脑A上
10+
*
1011
* @author liuyu
1112
* @date 2024/6/15
1213
*/
1314
public class KafkaClientSessionService extends ClientSessionService {
14-
//TODO 传输文件等大字节数传播的情况下,需处理kafka字节顺序消费问题
1515
public KafkaClientSessionService(ScConfig config) throws Exception {
1616
super(config);
1717
}
1818

1919
private KafkaUtil.BytesFunction sendToServer;
2020
private KafkaUtil.BytesFunction clientConsumer;
21+
2122
@Override
2223
public void connectToServer(ScConfig config, Cb cb) throws Exception {
2324
//初始化时构造好向kafka生产和消费数据的工具
24-
sendToServer = KafkaUtil.buildProducer(KafkaUtil.config.clientSendTopic);
25+
sendToServer = KafkaUtil.buildProducer(KafkaUtil.config.clientSendTopic, false);
2526

2627
clientConsumer = (bytes) -> {
2728
//消费到客户端的数据,调用receiveServerBytes方法来接收
@@ -31,7 +32,7 @@ public void connectToServer(ScConfig config, Cb cb) throws Exception {
3132
throw new RuntimeException(e);
3233
}
3334
};
34-
KafkaUtil.buildConsumer("client", KafkaUtil.config.serverSendTopic, clientConsumer);
35+
KafkaUtil.buildConsumer("client", KafkaUtil.config.serverSendTopic, clientConsumer, false);
3536
cb.end(null);//调用end方法,通知框架连接完成
3637
}
3738

@@ -40,7 +41,7 @@ public void sendBytesToServer(byte[] bytes) {
4041
sendToServer.f(bytes);
4142
}
4243

43-
public static void main(String[] args) throws Exception{
44+
public static void main(String[] args) throws Exception {
4445
ScConfig cfg = new ScConfig();
4546
cfg.clientUser = "user1";
4647
cfg.clientPassword = "12345";

addons-kafka/src/main/java/org/wowtools/hppt/addons/kafka/KafkaServerSessionService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ public KafkaServerSessionService(SsConfig ssConfig) {
3030
@Override
3131
protected void init(SsConfig ssConfig) throws Exception {
3232
//初始化时构造好向kafka生产和消费数据的工具
33-
sendToClient = KafkaUtil.buildProducer(KafkaUtil.config.serverSendTopic);
33+
sendToClient = KafkaUtil.buildProducer(KafkaUtil.config.serverSendTopic, true);
3434

3535
clientConsumer = (bytes) -> {
3636
//消费到客户端的数据,调用receiveClientBytes方法来接收
3737
receiveClientBytes(singleCtx, bytes);
3838
};
39-
KafkaUtil.buildConsumer("server", KafkaUtil.config.clientSendTopic, clientConsumer);
39+
KafkaUtil.buildConsumer("server", KafkaUtil.config.clientSendTopic, clientConsumer, true);
4040
}
4141

4242
@Override
@@ -54,7 +54,7 @@ protected void onExit() throws Exception {
5454
//TODO 关闭kafka生产者和消费者
5555
}
5656

57-
public static void main(String[] args) throws Exception{
57+
public static void main(String[] args) throws Exception {
5858
SsConfig cfg = new SsConfig();
5959
SsConfig.Client client = new SsConfig.Client();
6060
client.user = "user1";

addons-kafka/src/main/java/org/wowtools/hppt/addons/kafka/KafkaUtil.java

Lines changed: 84 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,20 @@
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
66
import lombok.extern.slf4j.Slf4j;
7+
import org.apache.kafka.clients.admin.*;
8+
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
79
import org.apache.kafka.clients.consumer.ConsumerRecord;
810
import org.apache.kafka.clients.consumer.ConsumerRecords;
911
import org.apache.kafka.clients.consumer.KafkaConsumer;
1012
import org.apache.kafka.clients.producer.KafkaProducer;
1113
import org.apache.kafka.clients.producer.Producer;
1214
import org.apache.kafka.clients.producer.ProducerRecord;
15+
import org.apache.kafka.common.KafkaFuture;
1316
import org.apache.kafka.common.TopicPartition;
1417
import org.wowtools.hppt.common.util.ResourcesReader;
1518

1619
import java.time.Duration;
17-
import java.util.Arrays;
18-
import java.util.Collections;
19-
import java.util.List;
20-
import java.util.Properties;
20+
import java.util.*;
2121

2222
/**
2323
* kafka工具类
@@ -54,26 +54,93 @@ public interface BytesFunction {
5454
/**
5555
* 构造一个向指定topic发送bytes数据的工具
5656
*
57-
* @param topic 主题
57+
* @param topic 主题
58+
* @param recreate 为true则重建一个 1个分区、1个副本的topic
5859
* @return BytesFunction 调用其f(byte[] bytes)方法发送数据
5960
*/
60-
public static BytesFunction buildProducer(String topic) {
61-
Producer<String, byte[]> producer = new KafkaProducer<>(buildProperties());
61+
public static BytesFunction buildProducer(String topic, boolean recreate) {
62+
Properties c = buildProperties();
63+
if (recreate) {
64+
recreateTopic(c, topic);
65+
}
66+
Producer<String, byte[]> producer = new KafkaProducer<>(c);
6267
return (bytes -> {
63-
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic,0,"x", bytes);
68+
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, 0, "x", bytes);
6469
producer.send(record);
6570
});
6671
}
6772

73+
private static void recreateTopic(Properties c, String topic) {
74+
try (AdminClient adminClient = AdminClient.create(c)) {
75+
try {
76+
// 获取当前所有 topic
77+
Set<String> topicNames = adminClient.listTopics().names().get();
78+
// 如果 topic 存在,则删除
79+
if (topicNames.contains(topic)) {
80+
log.info("Topic exists. Deleting: " + topic);
81+
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topic));
82+
deleteTopicsResult.all().get();
83+
// 等待 topic 删除彻底(视情况可调整)
84+
Thread.sleep(3000);
85+
}
86+
// 创建新的 topic
87+
NewTopic newTopic = new NewTopic(topic, 1, (short) 1); // 1个分区,1个副本
88+
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
89+
createTopicsResult.all().get();
90+
log.info("Topic created successfully: " + topic);
91+
} catch (Exception e) {
92+
log.warn("删除topic失败,尝试清除数据 {}", topic, e);
93+
// 获取 topic 的 partition 列表
94+
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(List.of(topic));
95+
List<TopicPartition> partitions = describeTopicsResult.values().get(topic)
96+
.get()
97+
.partitions()
98+
.stream()
99+
.map(p -> new TopicPartition(topic, p.partition()))
100+
.toList();
101+
102+
// 获取每个 partition 的最新 offset
103+
Map<TopicPartition, OffsetSpec> offsetSpecs = new HashMap<>();
104+
for (TopicPartition tp : partitions) {
105+
offsetSpecs.put(tp, OffsetSpec.latest());
106+
}
107+
Map<TopicPartition, ListOffsetsResultInfo> offsetResults =
108+
adminClient.listOffsets(offsetSpecs).all().get();
109+
110+
// 构造 RecordsToDelete 请求
111+
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
112+
for (TopicPartition tp : partitions) {
113+
long offset = offsetResults.get(tp).offset();
114+
recordsToDelete.put(tp, RecordsToDelete.beforeOffset(offset));
115+
}
116+
117+
// 执行 deleteRecords
118+
DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
119+
// 遍历所有分区的结果
120+
for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : deleteRecordsResult.lowWatermarks().entrySet()) {
121+
TopicPartition tp = entry.getKey();
122+
DeletedRecords deleted = entry.getValue().get(); // get() 是 KafkaFuture.get()
123+
log.info("Partition {} deleted up to offset {}", tp, deleted.lowWatermark());
124+
}
125+
}
126+
} catch (Exception e) {
127+
log.warn("清理topic失败 {}", topic, e);
128+
}
129+
}
130+
68131
/**
69132
* 消费kafka数据
70133
*
71-
* @param groupId 消费者组
72-
* @param topic 主题
73-
* @param cb 消费到字节时回调
134+
* @param groupId 消费者组
135+
* @param topic 主题
136+
* @param recreate 为true则重建一个 1个分区、1个副本的topic
137+
* @param cb 消费到字节时回调
74138
*/
75-
public static void buildConsumer(String groupId, String topic, BytesFunction cb) {
139+
public static void buildConsumer(String groupId, String topic, BytesFunction cb, boolean recreate) {
76140
Properties props = buildProperties();
141+
if (recreate) {
142+
recreateTopic(props, topic);
143+
}
77144
props.put("group.id", groupId + "-" + config.tag);
78145
props.put("auto.offset.reset", "latest");
79146
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
@@ -100,6 +167,11 @@ public static void buildConsumer(String groupId, String topic, BytesFunction cb)
100167
byte[] value = record.value();
101168
cb.f(value);
102169
}
170+
try {
171+
Thread.sleep(10);
172+
} catch (InterruptedException e) {
173+
throw new RuntimeException(e);
174+
}
103175
}
104176
});
105177
}

run/src/main/java/org/wowtools/hppt/common/client/ClientTalker.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.util.LinkedList;
1010
import java.util.List;
1111
import java.util.Map;
12-
import java.util.concurrent.BlockingQueue;
1312
import java.util.concurrent.TimeUnit;
1413

1514
/**
@@ -19,6 +18,10 @@
1918
@Slf4j
2019
public class ClientTalker {
2120

21+
public interface CommandCallBack {
22+
void cb(char type, String param) throws Exception;
23+
}
24+
2225
/**
2326
* 将缓冲区的数据转为满足向服务端发送的字节
2427
*
@@ -97,7 +100,7 @@ public static byte[] buildSendToServerBytes(CommonConfig config, long maxSendBod
97100
//接收服务端发来的字节并做相应处理
98101
public static boolean receiveServerBytes(CommonConfig config, byte[] responseBody,
99102
ClientSessionManager clientSessionManager, AesCipherUtil aesCipherUtil, BufferPool<String> sendCommandQueue,
100-
Map<Integer, ClientBytesSender.SessionIdCallBack> sessionIdCallBackMap) throws Exception {
103+
Map<Integer, ClientBytesSender.SessionIdCallBack> sessionIdCallBackMap, CommandCallBack commandCallBack) throws Exception {
101104
if (null == responseBody) {
102105
return true;
103106
}
@@ -155,6 +158,15 @@ public static boolean receiveServerBytes(CommonConfig config, byte[] responseBod
155158
sendCommandQueue.add(String.valueOf(Constant.SsCommands.CloseSession) + sessionId);
156159
}
157160
}
161+
default -> {
162+
if (null != commandCallBack) {
163+
try {
164+
commandCallBack.cb(type, command.substring(1));
165+
} catch (Exception e) {
166+
log.warn("服务端命令处理错误 {}", command, e);
167+
}
168+
}
169+
}
158170
}
159171
}
160172
}

run/src/main/java/org/wowtools/hppt/common/server/ServerTalker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ private static void receiveClientCommand(String command,
102102
case Constant.SsCommands.Heartbeat -> {
103103
log.debug("收到客户端心跳 {}", client.clientId);
104104
serverSessionManager.setLastHeartbeatTime(System.currentTimeMillis());
105+
//回发一个心跳包
106+
client.addCommand(Constant.ScCommands.Heartbeat + ":" + System.currentTimeMillis());
105107
}
106108
}
107109
}

run/src/main/java/org/wowtools/hppt/run/sc/common/PortReceiver.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ final class PortReceiver implements Receiver {
4141
private boolean noLogin = true;
4242

4343
private volatile boolean running = true;
44+
//服务端回复心跳包的时间
45+
private long serverHeartbeatTime = System.currentTimeMillis();
46+
47+
private final ClientTalker.CommandCallBack commandCallBack = (type, param) -> {
48+
if (Constant.ScCommands.Heartbeat == type) {
49+
log.info("收到服务端心跳包");
50+
serverHeartbeatTime = System.currentTimeMillis();
51+
}
52+
};
4453

4554

4655
public PortReceiver(ScConfig config, ClientSessionService clientSessionService) throws Exception {
@@ -66,7 +75,7 @@ public PortReceiver(ScConfig config, ClientSessionService clientSessionService)
6675
clientSessionService.sendBytesToServer(GridAesCipherUtil.encrypt("dt".getBytes(StandardCharsets.UTF_8)));
6776
//等待时间戳返回
6877
int n = 0;
69-
while (null == dt) {
78+
while (null == dt && clientSessionService.running) {
7079
try {
7180
Thread.sleep(100);
7281
} catch (InterruptedException e) {
@@ -84,18 +93,30 @@ public PortReceiver(ScConfig config, ClientSessionService clientSessionService)
8493
checkSessionInit();
8594
});
8695

87-
//发送心跳包
96+
//心跳检测
8897
if (config.heartbeatPeriod > 0) {
8998
Thread.startVirtualThread(() -> {
99+
try {
100+
Thread.sleep(config.heartbeatPeriod / 3);
101+
} catch (InterruptedException e) {
102+
}
90103
while (running) {
104+
//发送心跳包
105+
sendCommandQueue.add(Constant.SsCommands.Heartbeat + ":" + System.currentTimeMillis());
106+
//检测服务端心跳回复
107+
if (System.currentTimeMillis() - serverHeartbeatTime > config.heartbeatPeriod * 1.5) {
108+
log.warn("长期未收到服务端心跳,疑似故障,重启");
109+
clientSessionService.exit();
110+
}
111+
log.info("心跳检测正常");
91112
try {
92113
Thread.sleep(config.heartbeatPeriod);
93114
} catch (InterruptedException e) {
94115
continue;
95116
}
96-
sendCommandQueue.add(Constant.SsCommands.Heartbeat + ":" + System.currentTimeMillis());
97117
}
98118
});
119+
99120
}
100121

101122
});
@@ -144,7 +165,7 @@ public void receiveServerBytes(byte[] bytes) throws Exception {
144165
log.warn("未知命令 {}", s);
145166
}
146167
} else {
147-
ClientTalker.receiveServerBytes(config, bytes, clientSessionManager, aesCipherUtil, sendCommandQueue, sessionIdCallBackMap);
168+
ClientTalker.receiveServerBytes(config, bytes, clientSessionManager, aesCipherUtil, sendCommandQueue, sessionIdCallBackMap, commandCallBack);
148169
}
149170
}
150171

run/src/main/java/org/wowtools/hppt/run/sc/pojo/ScConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,9 @@ public static final class FileConfig {
194194

195195

196196
/**
197-
* 心跳周期,若此值大于0,定期向服务端发送心跳包
197+
* 心跳周期 毫秒,若此值大于0,定期向服务端发送心跳包
198198
*/
199-
public long heartbeatPeriod = -1;
199+
public long heartbeatPeriod = 120_000;
200200

201201

202202
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
type: 'org.wowtools.hppt.addons.kafka.KafkaClientSessionService'
2+
3+
localHost: 127.0.0.1
4+
# 客户端用户名,每个sc进程用一个,不要重复
5+
clientUser: user1
6+
# 客户端密码
7+
clientPassword: 12345
8+
9+
#是否启用内容加密,默认启用 需和服务端保持一致
10+
enableEncrypt: true
11+
12+
forwards:
13+
- localPort: 50022
14+
remoteHost: "wsl"
15+
remotePort: 22
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# 通讯协议 客户端与服务端保持一致
2+
type: 'org.wowtools.hppt.addons.kafka.KafkaServerSessionService'
3+
4+
5+
# 允许的客户端账号和密码
6+
clients:
7+
- user: user1
8+
password: 12345
9+
- user: user2
10+
password: 112233

0 commit comments

Comments
 (0)