Skip to content

Commit

Permalink
fix(topicdata): fix and enhance search (#1538)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexisSouquiere authored Jul 31, 2023
1 parent f36ae79 commit 96fd6af
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 49 deletions.
8 changes: 8 additions & 0 deletions client/src/containers/Topic/Topic/TopicData/TopicData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,14 @@ class TopicData extends Root {
{this._renderSearchFilter('keySubject', 'Key Subject')}
{this._renderSearchFilter('valueSubject', 'Value Subject')}

<p style={{ fontStyle: 'italic' }}>
* Whitespaces in search string are considered as separators for search patterns unless
enclosed with double quotes
<br />
In case of multiple patterns, OR operator is applied for Contains / Equals. AND is applied
for Not contains
</p>

<div style={{ display: 'flex' }}>
<button
className="btn btn-primary inline-block search"
Expand Down
116 changes: 67 additions & 49 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
@Singleton
@Slf4j
public class RecordRepository extends AbstractRepository {
public static final String SEARCH_SPLIT_REGEX = " (?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)";
@Inject
private KafkaModule kafkaModule;

Expand Down Expand Up @@ -164,7 +165,7 @@ private List<Record> consumeOldest(Topic topic, Options options) {

for (ConsumerRecord<byte[], byte[]> record : records) {
Record current = newRecord(record, options, topic);
if (searchFilter(options, current)) {
if (matchFilters(options, current)) {
filterMessageLength(current);
list.add(current);
}
Expand Down Expand Up @@ -323,7 +324,7 @@ private List<Record> consumeNewest(Topic topic, Options options) {
break;
}
Record current = newRecord(record, options, topic);
if (searchFilter(options, current)) {
if (matchFilters(options, current)) {
filterMessageLength(current);
list.add(current);
}
Expand Down Expand Up @@ -711,7 +712,7 @@ public Flowable<Event<SearchEvent>> search(Topic topic, Options options) throws
currentEvent.updateProgress(record);

Record current = newRecord(record, options, topic);
if (searchFilter(options, current)) {
if (matchFilters(options, current)) {
list.add(current);
matchesCount.getAndIncrement();

Expand Down Expand Up @@ -741,49 +742,49 @@ public Flowable<Event<SearchEvent>> search(Topic topic, Options options) throws
});
}

private static boolean searchFilter(BaseOptions options, Record record) {
private boolean matchFilters(BaseOptions options, Record record) {

if (options.getSearch() != null) {
return search(options.getSearch(), Arrays.asList(record.getKey(), record.getValue()));
return matchFilter(options.getSearch(), Arrays.asList(record.getKey(), record.getValue()));
} else {
if (options.getSearchByKey() != null) {
if (!search(options.getSearchByKey(), Collections.singletonList(record.getKey()))) {
if (!matchFilter(options.getSearchByKey(), Collections.singletonList(record.getKey()))) {
return false;
}
}

if (options.getSearchByValue() != null) {
if (!search(options.getSearchByValue(), Collections.singletonList(record.getValue()))) {
if (!matchFilter(options.getSearchByValue(), Collections.singletonList(record.getValue()))) {
return false;
}
}

if (options.getSearchByHeaderKey() != null) {
if (!search(options.getSearchByHeaderKey(), record.getHeadersKeySet())) {
if (!matchFilter(options.getSearchByHeaderKey(), record.getHeadersKeySet())) {
return false;
}
}

if (options.getSearchByHeaderValue() != null) {
if (!search(options.getSearchByHeaderValue(), record.getHeadersValues())) {
if (!matchFilter(options.getSearchByHeaderValue(), record.getHeadersValues())) {
return false;
}
}

if (options.getSearchByKeySubject() != null) {
if (!search(options.getSearchByKeySubject(), Collections.singletonList(record.getKeySubject()))) {
if (!matchFilter(options.getSearchByKeySubject(), Collections.singletonList(record.getKeySubject()))) {
return false;
}
}

if (options.getSearchByValueSubject() != null) {
return search(options.getSearchByValueSubject(), Collections.singletonList(record.getValueSubject()));
return matchFilter(options.getSearchByValueSubject(), Collections.singletonList(record.getValueSubject()));
}
}
return true;
}

private static boolean search(Search searchFilter, Collection<String> stringsToSearch) {
private boolean matchFilter(Search searchFilter, Collection<String> stringsToSearch) {
switch (searchFilter.searchMatchType) {
case EQUALS:
return equalsAll(searchFilter.getText(), stringsToSearch);
Expand All @@ -794,65 +795,82 @@ private static boolean search(Search searchFilter, Collection<String> stringsToS
}
}

private static boolean containsAll(String search, Collection<String> in) {
/**
* Check that one of the input strings contains at least one time the patterns in the search string
* Patterns are extracted from the search string based on whitespace unless enclosed with double quotes
*
* @param search - the search string
* @param in - all the input string to check
* @return true if input matches at least one time the patterns
*/
private boolean containsAll(String search, Collection<String> in) {
if (search.equals("null")) {
return in
.stream()
.allMatch(Objects::isNull);
}

String[] split = search.toLowerCase().split("\\s");
for (String s : in) {
if(s != null) {
s = s.toLowerCase();
for (String k : split) {
if (s.contains(k)) {
return true;
}
}
}
}
return false;
return in.parallelStream()
.anyMatch(s -> extractSearchPatterns(search)
.stream()
.anyMatch(s.toLowerCase()::contains));
}

private static boolean equalsAll(String search, Collection<String> in) {
/**
* Check that one of the input strings matches exactly at least one time the patterns in the search string
* Patterns are extracted from the search string based on whitespace unless enclosed with double quotes
*
* @param search - the search string
* @param in - all the input string to check
* @return true if one of the input matches exactly at least one time the patterns
*/
private boolean equalsAll(String search, Collection<String> in) {
if (search.equals("null")) {
return in
.stream()
.allMatch(Objects::isNull);
}

String[] split = search.toLowerCase().split("\\s");
for (String s : in) {
if(s != null) {
final String lowerS = s.toLowerCase();

return Stream.of(split)
.filter(lowerS::equals)
.count() == split.length;
}
}
return false;
return in.parallelStream().filter(Objects::nonNull)
.anyMatch(s -> extractSearchPatterns(search).contains(s.toLowerCase()));
}

private static boolean notContainsAll(String search, Collection<String> in) {
/**
* Check that one of the input strings does not contain at least one time the patterns in the search string
* Patterns are extracted from the search string based on whitespace unless enclosed with double quotes
*
* @param search - the search string
* @param in - all the input string to check
* @return true if input does not contain at least one time the patterns
*/
private boolean notContainsAll(String search, Collection<String> in) {
if (search.equals("null")) {
return in
.stream()
.noneMatch(Objects::isNull);
}

String[] split = search.toLowerCase().split("\\s");
for (String s : in) {
if(s != null) {
final String lowerS = s.toLowerCase();
return in.parallelStream()
.filter(Objects::nonNull)
.anyMatch(s -> extractSearchPatterns(search)
.stream()
.noneMatch(s.toLowerCase()::contains));
}

return Stream.of(split)
.filter(lowerS::contains)
.count() == split.length;
}
}
return true;
/**
* Extract search patterns from the search string by splitting on whitespace
* If a pattern is enclosed with double quotes, the white space will be ignored during splitting
*
* @param searchString the search string to split into patterns
* @return search patterns
*/
private List<String> extractSearchPatterns(String searchString) {
return Arrays.stream(searchString.toLowerCase().split(SEARCH_SPLIT_REGEX, -1))
.map(s -> {
// Update pattern enclosed with double quotes by removing backslashes and start/end double quotes
s = s.replaceAll("\\\\", "");
return s.startsWith("\"") ? s.substring(1, s.length() - 1) : s;
}).collect(Collectors.toList());
}

@ToString
Expand Down Expand Up @@ -965,7 +983,7 @@ public Flowable<Event<TailEvent>> tail(String clusterId, TailOptions options) {
);

Record current = newRecord(record, options, state.getTopics().get(record.topic()));
if (searchFilter(options, current)) {
if (matchFilters(options, current)) {
list.add(current);
log.trace(
"Record [topic: {}] [partition: {}] [offset: {}] [key: {}]",
Expand Down

0 comments on commit 96fd6af

Please sign in to comment.