Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhacement: prevent UI freezing with large messages and single messages downloading #970

Merged
merged 16 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
9afdaf3
Fix: IndexOutOfBoundsException thrown in topics view when changing to…
EdwinFajardoBarrera Dec 22, 2021
b16e0af
Fixing bug of outOfBoundsException when in ConnectList component
EdwinFajardoBarrera Dec 22, 2021
f5ede0c
Merge branch 'tchiotludo:dev' into dev
EdwinFajardoBarrera Dec 22, 2021
c336510
Merge branch 'dev' of https://github.com/EdwinFajardoBarrera/akhq int…
EdwinFajardoBarrera Dec 22, 2021
40c7082
Merge branch 'tchiotludo:dev' into dev
EdwinFajardoBarrera Dec 22, 2021
4b6b654
fix: Reference to Record is ambiguous #965
EdwinFajardoBarrera Dec 22, 2021
7fef739
Merge branch 'tchiotludo:dev' into dev
EdwinFajardoBarrera Dec 23, 2021
6ff01fb
Merge branch 'tchiotludo:dev' into dev
EdwinFajardoBarrera Dec 28, 2021
6bb0ba2
enhancement: Displaying large messages in topics is stalling the UI a…
EdwinFajardoBarrera Dec 29, 2021
9877419
Fixed varible maxKafkaMessageLength
EdwinFajardoBarrera Jan 3, 2022
2d032bb
Changed kafka-max-message-length config variable to application.yml a…
EdwinFajardoBarrera Jan 3, 2022
261387f
Merge branch 'dev' of https://github.com/EdwinFajardoBarrera/akhq int…
EdwinFajardoBarrera Jan 3, 2022
467e369
Solved conflict
EdwinFajardoBarrera Jan 3, 2022
77ec471
Update application.yml
EdwinFajardoBarrera Jan 3, 2022
839196e
Moved kafka-max-message-length var to topic-data group, removed @Sett…
EdwinFajardoBarrera Jan 4, 2022
8ff919f
Moved kafka-max-message-length var to topic-data group, removed @Sett…
EdwinFajardoBarrera Jan 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion client/src/components/Table/Table.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class Table extends Component {
}

renderActions(row) {
const { actions, onAdd, onDetails, onConfig, onDelete, onEdit, onRestart, onShare, idCol } = this.props;
const { actions, onAdd, onDetails, onConfig, onDelete, onEdit, onRestart, onShare, onDownload, idCol } = this.props;

let idColVal = idCol ? row[this.props.idCol] : row.id;

Expand Down Expand Up @@ -374,6 +374,18 @@ class Table extends Component {
</span>
</td>
)}
{actions.find(el => el === constants.TABLE_DOWNLOAD) && (
<td className="khq-row-action khq-row-action-main action-hover">
<span title="Download"
id="download"
onClick={() => {
onDownload && onDownload(row);
}}
>
<i className="fa fa-download" />
</span>
</td>
)}
</>
);
}
Expand Down
31 changes: 28 additions & 3 deletions client/src/containers/Topic/Topic/TopicData/TopicData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class TopicData extends Root {
roles: JSON.parse(sessionStorage.getItem('roles')),
canDeleteRecords: false,
percent: 0,
loading: true
loading: true,
canDownload: false
};

searchFilterTypes = [
Expand Down Expand Up @@ -113,6 +114,7 @@ class TopicData extends Root {
() => {
if(query.get('single') !== null) {
this._getSingleMessage(query.get('partition'), query.get('offset'));
this.setState({ canDownload: true })
} else {
this._getMessages();
}
Expand Down Expand Up @@ -346,10 +348,23 @@ class TopicData extends Root {
console.error('Failed to copy: ', err);
}

this.setState({ canDownload: true })

this.props.history.push(pathToShare)
this._getSingleMessage(row.partition, row.offset);
}

_handleDownload({ key, value: data }) {
const hasKey = key && key !== null && key !== 'null';

const a = document.createElement('a');
a.href = URL.createObjectURL( new Blob([data], { type:'text/json' }) );
a.download = `${hasKey ? key : 'file'}.json`;

a.click();
a.remove();
}

_showDeleteModal = deleteMessage => {
this.setState({ showDeleteModal: true, deleteMessage });
};
Expand Down Expand Up @@ -383,7 +398,9 @@ class TopicData extends Root {
messages.forEach(message => {
let messageToPush = {
key: message.key || 'null',
value: message.value || 'null',
value: message.truncated
? message.value + '...\nToo large message. Full body in share button.' || 'null'
: message.value || 'null',
timestamp: message.timestamp,
partition: JSON.stringify(message.partition) || '',
offset: JSON.stringify(message.offset) || '',
Expand Down Expand Up @@ -642,9 +659,14 @@ class TopicData extends Root {
datetime,
isSearching,
canDeleteRecords,
canDownload,
percent,
loading
} = this.state;

let actions = canDeleteRecords ? [constants.TABLE_DELETE, constants.TABLE_SHARE] : [constants.TABLE_SHARE]
if (canDownload) actions.push(constants.TABLE_DOWNLOAD)

let date = moment(datetime);
const { history } = this.props;
const firstColumns = [
Expand Down Expand Up @@ -965,7 +987,10 @@ class TopicData extends Root {
onShare={row => {
this._handleOnShare(row);
}}
actions={canDeleteRecords ? [constants.TABLE_DELETE, constants.TABLE_SHARE] : [constants.TABLE_SHARE]}
onDownload={row => {
this._handleDownload(row);
}}
actions={actions}
onExpand={obj => {
return Object.keys(obj.headers).map(header => {
return (
Expand Down
2 changes: 2 additions & 0 deletions client/src/utils/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export const TABLE_DETAILS = 'details';
export const TABLE_CONFIG = 'config';
export const TABLE_RESTART = 'restart';
export const TABLE_SHARE = 'share';
export const TABLE_DOWNLOAD = 'download'

// Tab names/route names
export const CLUSTER = 'cluster';
Expand Down Expand Up @@ -65,6 +66,7 @@ export default {
TABLE_CONFIG,
TABLE_RESTART,
TABLE_SHARE,
TABLE_DOWNLOAD,
CLUSTER,
NODE,
TOPIC,
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.micronaut.context.annotation.Value;
import kafka.coordinator.group.GroupMetadataManager;
import kafka.coordinator.transaction.TransactionLog;
import kafka.coordinator.transaction.TxnKey;
Expand Down Expand Up @@ -71,12 +72,16 @@ public class Record {
private byte[] bytesValue;

@Getter(AccessLevel.NONE)
@Setter(AccessLevel.NONE)
private String value;

private final List<String> exceptions = new ArrayList<>();

private byte MAGIC_BYTE;

@Setter(AccessLevel.NONE)
EdwinFajardoBarrera marked this conversation as resolved.
Show resolved Hide resolved
private Boolean truncated;

public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, Map<String, String> headers, Topic topic) {
this.MAGIC_BYTE = schemaRegistryType.getMagicByte();
this.topic = topic;
Expand All @@ -88,6 +93,7 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte
this.bytesValue = bytesValue;
this.valueSchemaId = getAvroSchemaId(this.bytesValue);
this.headers = headers;
this.truncated = false;
}

public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
Expand Down Expand Up @@ -118,6 +124,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.kafkaProtoDeserializer = kafkaProtoDeserializer;
this.avroToJsonSerializer = avroToJsonSerializer;
this.kafkaJsonDeserializer = kafkaJsonDeserializer;
this.truncated = false;
}

public String getKey() {
Expand Down Expand Up @@ -145,6 +152,14 @@ public String getValue() {
return this.value;
}

public void setValue(String value) {
this.value = value;
}

public void setTruncated(Boolean truncated) {
this.truncated = truncated;
}

private String convertToString(byte[] payload, Integer schemaId, boolean isKey) {
if (payload == null) {
return null;
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.common.header.internals.RecordHeader;
import org.codehaus.httpcache4j.uri.URIBuilder;

import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -76,6 +77,9 @@ public class RecordRepository extends AbstractRepository {
@Value("${akhq.clients-defaults.consumer.properties.max.poll.records:50}")
protected int maxPollRecords;

@Value("${akhq.kafka-max-message-length}")
EdwinFajardoBarrera marked this conversation as resolved.
Show resolved Hide resolved
private int maxKafkaMessageLength;

public Map<String, Record> getLastRecord(String clusterId, List<String> topicsName) throws ExecutionException, InterruptedException {
Map<String, Topic> topics = topicRepository.findByName(clusterId, topicsName).stream()
.collect(Collectors.toMap(Topic::getName, Function.identity()));
Expand Down Expand Up @@ -153,6 +157,7 @@ private List<Record> consumeOldest(Topic topic, Options options) {
for (ConsumerRecord<byte[], byte[]> record : records) {
Record current = newRecord(record, options, topic);
if (searchFilter(options, current)) {
filterMessageLength(current);
list.add(current);
}
}
Expand Down Expand Up @@ -311,6 +316,7 @@ private List<Record> consumeNewest(Topic topic, Options options) {
}
Record current = newRecord(record, options, topic);
if (searchFilter(options, current)) {
filterMessageLength(current);
list.add(current);
}
}
Expand Down Expand Up @@ -1268,5 +1274,14 @@ private static class EndOffsetBound {
private final long end;
private final KafkaConsumer<byte[], byte[]> consumer;
}

private void filterMessageLength(Record record) {
EdwinFajardoBarrera marked this conversation as resolved.
Show resolved Hide resolved
int bytesLength = record.getValue().getBytes(StandardCharsets.UTF_8).length;
if (bytesLength > maxKafkaMessageLength) {
int substringChars = maxKafkaMessageLength / 1000;
record.setValue(record.getValue().substring(0, substringChars));
record.setTruncated(true);
}
}
}

1 change: 1 addition & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ akhq:
format: "[Date: {}] [Duration: {} ms] [Url: {} {}] [Status: {}] [Ip: {}] [User: {}]"
filters:
- "((?!/health).)*"
kafka-max-message-length: 1000000

clients-defaults:
consumer:
Expand Down