Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-message produce feature #791

Merged
merged 6 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions client/src/components/Form/Form.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ class Form extends Root {
);
};

renderJSONInput = (name, label, onChange) => {
renderJSONInput = (name, label, onChange, textMode, options) => {
const { formData, errors } = this.state;
const inputMode = textMode ? "text" : (formData.schemaType === "PROTOBUF" ? "protobuf" : "json")
return (
<div className="form-group row">
{label !== '' ? (
Expand All @@ -125,7 +126,7 @@ class Form extends Root {
)}
<div className="col-sm-10" style={{ height: '100%' }}>
<AceEditor
mode={ formData.schemaType === "PROTOBUF"? "protobuf" : "json" }
mode={ inputMode }
id={name}
theme="merbivore_soft"
value={formData[name]}
Expand All @@ -134,6 +135,7 @@ class Form extends Root {
}}
name="UNIQUE_ID_OF_DIV"
editorProps={{ $blockScrolling: true }}
setOptions={options}
style={{ width: '100%', minHeight: '25vh' }}
/>
{errors[name] && <div className="alert alert-danger mt-1 p-1">{errors[name]}</div>}
Expand Down Expand Up @@ -238,6 +240,21 @@ class Form extends Root {
</React.Fragment>
);
};

renderCheckbox = (name, label, isChecked, onChange, isDefaultChecked) => {
return (
<input
type="checkbox"
name={name}
id={name}
class="form-input-check"
checked={isChecked}
onChange={onChange}
defaultChecked={ isDefaultChecked ? isDefaultChecked : false}
/>
);
};
}

export default Form;

16 changes: 15 additions & 1 deletion client/src/components/Form/styles.scss
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,18 @@ input.placeholder{
border-bottom-color: transparent !important;
width: 100%;
padding: 0px;
}
}

.ace_active-line {
opacity: 0.2;
}

.ace_placeholder {
font-family: "Source Code Pro",SFMono-Regular,Menlo,Monaco,Consolas,"Liberation Mono","Courier New",monospace;
transform: scale(1);
}

.form-input-check{
margin-block: auto;
}

98 changes: 81 additions & 17 deletions client/src/containers/Topic/TopicProduce/TopicProduce.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class TopicProduce extends Form {
key: '',
hKey0: '',
hValue0: '',
value: ''
value: '',
keyValueSeparator: ':'
},
datetime: new Date(),
openDateModal: false,
Expand All @@ -40,7 +41,9 @@ class TopicProduce extends Form {
valueSchemaSearchValue: '',
selectedValueSchema: '',
clusterId: '',
topicId: ''
topicId: '',
multiMessage: false,
valuePlaceholder: '{"param": "value"}'
};

schema = {
Expand All @@ -58,7 +61,10 @@ class TopicProduce extends Form {
.label('hValue0'),
value: Joi.string()
.allow('')
.label('Value')
.label('Value'),
keyValueSeparator: Joi.string()
.min(1)
.label('keyValueSeparator')
};

async componentDidMount() {
Expand Down Expand Up @@ -98,7 +104,8 @@ class TopicProduce extends Form {
selectedKeySchema,
selectedValueSchema,
keySchema,
valueSchema
valueSchema,
multiMessage
} = this.state;
const { clusterId, topicId } = this.props.match.params;

Expand All @@ -110,9 +117,11 @@ class TopicProduce extends Form {
partition: formData.partition,
key: formData.key,
timestamp: datetime.toISOString(),
value: JSON.parse(JSON.stringify(formData.value)),
value: multiMessage ? formData.value : JSON.parse(JSON.stringify(formData.value)),
keySchema: schemaKeyToSend ? schemaKeyToSend.id : '',
valueSchema: schemaValueToSend ? schemaValueToSend.id : ''
valueSchema: schemaValueToSend ? schemaValueToSend.id : '',
multiMessage: multiMessage,
keyValueSeparator: formData.keyValueSeparator
};

let headers = {};
Expand All @@ -134,6 +143,55 @@ class TopicProduce extends Form {
});
}

renderMultiMessage() {
const { formData, multiMessage } = this.state;

return (
<div className="form-group row">
<label className="col-sm-2 col-form-label">Multi message</label>
<div className="row khq-multiple col-sm-7">
{this.renderCheckbox(
'isMultiMessage',
'',
multiMessage,
() => {
this.setState({multiMessage: !multiMessage,
valuePlaceholder: this.getPlaceholderValue(!multiMessage, formData.keyValueSeparator)})
},
false
)}

<label className="col-auto col-form-label">Separator</label>
<input
type='text'
name='keyValueSeparator'
id='keyValueSeparator'
placeholder=':'
class='col-sm-2 form-control'
disabled={ !multiMessage }
onChange={
event => {
this.setState({
formData: { ...formData,
keyValueSeparator: event.target.value},
valuePlaceholder: this.getPlaceholderValue(!multiMessage, event.target.value)})
}
}
/>
</div>
</div>
);
}

getPlaceholderValue(isMultiMessage, keyValueSeparator) {
if(isMultiMessage) {
return "key1" + keyValueSeparator + "{\"param\": \"value1\"}\n"
+ "key2" + keyValueSeparator + "{\"param\": \"value2\"}";
} else {
return '{"param": "value"}';
}
}

renderHeaders() {
let headers = [];

Expand Down Expand Up @@ -284,13 +342,13 @@ class TopicProduce extends Form {
selectedKeySchema,
valueSchema,
valueSchemaSearchValue,
selectedValueSchema
selectedValueSchema,
multiMessage
} = this.state;
let date = moment(datetime);
return (
<div>
<form encType="multipart/form-data" className="khq-form khq-form-config">
<div>
<Header title={`Produce to ${topicId} `} />
{this.renderSelect('partition', 'Partition', partitions, value => {
this.setState({ formData: { ...formData, partition: value.target.value } });
Expand All @@ -313,11 +371,11 @@ class TopicProduce extends Form {
'key'
)
)}
{this.renderInput('key', 'Key', 'Key', 'Key')}
<div></div>
</div>

{(this.renderInput('key', 'Key', 'Key', 'Key', undefined, undefined, undefined, undefined,{ disabled: multiMessage }))}

{this.renderHeaders()}

{this.renderDropdown(
'Value schema',
valueSchema.map(value => value.subject),
Expand All @@ -336,14 +394,19 @@ class TopicProduce extends Form {
'value'
)
)}

{this.renderMultiMessage()}

{this.renderJSONInput('value', 'Value', value => {
this.setState({
formData: {
...formData,
value: value
}
});
})}
formData: {
...formData,
value: value
}
})},
multiMessage, // true -> 'text' mode; json, protobuff, ... mode otherwise
{ placeholder: this.getPlaceholderValue(multiMessage, formData.keyValueSeparator) }
)}
<div style={{ display: 'flex', flexDirection: 'row', width: '100%', padding: 0 }}>
<label
style={{ padding: 0, alignItems: 'center', display: 'flex' }}
Expand Down Expand Up @@ -429,3 +492,4 @@ class TopicProduce extends Form {
}

export default TopicProduce;

28 changes: 17 additions & 11 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public Topic create(
@Secured(Role.ROLE_TOPIC_DATA_INSERT)
@Post(value = "api/{cluster}/topic/{topicName}/data")
@Operation(tags = {"topic data"}, summary = "Produce data to a topic")
public Record produce(
public List<Record> produce(
HttpRequest<?> request,
String cluster,
String topicName,
Expand All @@ -133,9 +133,12 @@ public Record produce(
Optional<String> timestamp,
Map<String, String> headers,
Optional<Integer> keySchema,
Optional<Integer> valueSchema
Optional<Integer> valueSchema,
Boolean multiMessage,
Optional<String> keyValueSeparator
) throws ExecutionException, InterruptedException {
return new Record(
Topic targetTopic = topicRepository.findByName(cluster, topicName);
return
this.recordRepository.produce(
cluster,
topicName,
Expand All @@ -145,14 +148,16 @@ public Record produce(
partition,
timestamp.map(r -> Instant.parse(r).toEpochMilli()),
keySchema,
valueSchema
),
schemaRegistryRepository.getSchemaRegistryType(cluster),
key.map(String::getBytes).orElse(null),
value.getBytes(),
headers,
topicRepository.findByName(cluster, topicName)
);
valueSchema,
multiMessage,
keyValueSeparator).stream()
.map(recordMetadata -> new Record(recordMetadata,
schemaRegistryRepository.getSchemaRegistryType(cluster),
key.map(String::getBytes).orElse(null),
value.getBytes(),
headers,
targetTopic))
.collect(Collectors.toList());
}

@Secured(Role.ROLE_TOPIC_DATA_READ)
Expand Down Expand Up @@ -504,3 +509,4 @@ public static class OffsetCopy {
private long offset;
}
}

14 changes: 14 additions & 0 deletions src/main/java/org/akhq/models/KeyValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.akhq.models;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* Represents a simple key-value pair of any type
*/
@Getter
@AllArgsConstructor
public class KeyValue<K,V> {
K key;
V value;
}
48 changes: 48 additions & 0 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.controllers.TopicController;
import org.akhq.models.KeyValue;
import org.akhq.models.Partition;
import org.akhq.models.Record;
import org.akhq.models.Topic;
Expand Down Expand Up @@ -457,6 +458,35 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions opti
);
}

public List<RecordMetadata> produce(
String clusterId,
String topic,
String value,
Map<String, String> headers,
Optional<String> key,
Optional<Integer> partition,
Optional<Long> timestamp,
Optional<Integer> keySchemaId,
Optional<Integer> valueSchemaId,
Boolean multiMessage,
Optional<String> keyValueSeparator) throws ExecutionException, InterruptedException {

List<RecordMetadata> produceResults = new ArrayList<>();

// Distinguish between single record produce, and multiple messages
if (multiMessage.booleanValue()) {
// Split key-value pairs and produce them
for (KeyValue<String, String> kvPair : splitMultiMessage(value, keyValueSeparator.orElseThrow())) {
produceResults.add(produce(clusterId, topic, kvPair.getValue(), headers, Optional.of(kvPair.getKey()),
partition, timestamp, keySchemaId, valueSchemaId));
}
} else {
produceResults.add(
produce(clusterId, topic, value, headers, key, partition, timestamp, keySchemaId, valueSchemaId));
}
return produceResults;
}

private RecordMetadata produce(
String clusterId,
String topic, byte[] value,
Expand Down Expand Up @@ -486,6 +516,23 @@ private RecordMetadata produce(
.get();
}

/**
* Splits a multi-message into a list of key-value pairs.
* @param value The multi-message string submitted by the {@link TopicController}
* @param keyValueSeparator The character(s) separating each key from their corresponding value
* @return A list of {@link KeyValue}, holding the split pairs
*/
private List<KeyValue<String, String>> splitMultiMessage(String value, String keyValueSeparator) {
return List.of(value.split("\r\n|\r|\n")).stream().map(v -> splitKeyValue(v, keyValueSeparator))
.collect(Collectors.toList());
}

private KeyValue<String, String> splitKeyValue(String keyValueStr, String keyValueSeparator) {
String[] keyValue = null;
keyValue = keyValueStr.split(keyValueSeparator, 2);
return new KeyValue<>(keyValue[0].trim(),keyValue[1]);
}

public void emptyTopic(String clusterId, String topicName) throws ExecutionException, InterruptedException {
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
var topic = topicRepository.findByName(clusterId, topicName);
Expand Down Expand Up @@ -1209,3 +1256,4 @@ private static class EndOffsetBound {
private final KafkaConsumer<byte[], byte[]> consumer;
}
}

Loading