Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added validations for the cluster parameter in API using exceptions #1104

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/org/akhq/controllers/ErrorController.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.micronaut.security.authentication.AuthorizationException;
import io.micronaut.security.rules.SecurityRule;
import lombok.extern.slf4j.Slf4j;
import org.akhq.modules.InvalidClusterException;
import org.apache.kafka.common.errors.ApiException;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.ConcurrentConfigModificationException;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.InvalidRequestException;
Expand Down Expand Up @@ -108,4 +109,11 @@ public HttpResponse<?> notFound(HttpRequest<?> request) throws URISyntaxExceptio
return HttpResponse.<JsonError>notFound()
.body(error);
}
@Error(global = true)
public HttpResponse<?> error(HttpRequest<?> request, InvalidClusterException e) {
JsonError error = new JsonError(e.getMessage())
.link(Link.SELF, Link.of(request.getUri()));

return HttpResponse.status(HttpStatus.CONFLICT).body(error);
}
}
7 changes: 7 additions & 0 deletions src/main/java/org/akhq/modules/InvalidClusterException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.akhq.modules;

public class InvalidClusterException extends RuntimeException {
public InvalidClusterException(String message) {
super(message);
}
}
125 changes: 95 additions & 30 deletions src/main/java/org/akhq/modules/KafkaModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.exceptions.HttpStatusException;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.akhq.configs.AbstractProperties;
Expand Down Expand Up @@ -46,7 +48,15 @@ public List<String> getClustersList() {
.collect(Collectors.toList());
}

public Connection getConnection(String cluster) {
public boolean clusterExists(String cluster){
return this.getClustersList().contains(cluster);
}

public Connection getConnection(String cluster) throws InvalidClusterException {
if (!this.clusterExists(cluster)) {
throw new InvalidClusterException("Invalid cluster '" + cluster + "'");
}

return this.connections
.stream()
.filter(r -> r.getName().equals(cluster))
Expand All @@ -60,30 +70,40 @@ private Properties getDefaultsProperties(List<? extends AbstractProperties> curr
current
.stream()
.filter(r -> r.getName().equals(type))
.forEach(r -> r.getProperties()
.forEach(properties::put)
);
.forEach(r -> properties.putAll(r.getProperties()));

return properties;
}

private Properties getConsumerProperties(String clusterId) {
private Properties getConsumerProperties(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

Properties props = new Properties();
props.putAll(this.getDefaultsProperties(this.defaults, "consumer"));
props.putAll(this.getDefaultsProperties(this.connections, clusterId));

return props;
}

private Properties getProducerProperties(String clusterId) {
private Properties getProducerProperties(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

Properties props = new Properties();
props.putAll(this.getDefaultsProperties(this.defaults, "producer"));
props.putAll(this.getDefaultsProperties(this.connections, clusterId));

return props;
}

private Properties getAdminProperties(String clusterId) {
private Properties getAdminProperties(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

Properties props = new Properties();
props.putAll(this.getDefaultsProperties(this.defaults, "admin"));
props.putAll(this.getDefaultsProperties(this.connections, clusterId));
Expand All @@ -93,23 +113,35 @@ private Properties getAdminProperties(String clusterId) {

private final Map<String, AdminClient> adminClient = new HashMap<>();

public AdminClient getAdminClient(String clusterId) {
public AdminClient getAdminClient(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

if (!this.adminClient.containsKey(clusterId)) {
this.adminClient.put(clusterId, AdminClient.create(this.getAdminProperties(clusterId)));
}

return this.adminClient.get(clusterId);
}

public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId) {
public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

return new KafkaConsumer<>(
this.getConsumerProperties(clusterId),
new ByteArrayDeserializer(),
new ByteArrayDeserializer()
);
}

public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId, Properties properties) {
public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId, Properties properties) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

Properties props = this.getConsumerProperties(clusterId);
props.putAll(properties);

Expand All @@ -122,7 +154,11 @@ public KafkaConsumer<byte[], byte[]> getConsumer(String clusterId, Properties pr

private final Map<String, KafkaProducer<byte[], byte[]>> producers = new HashMap<>();

public KafkaProducer<byte[], byte[]> getProducer(String clusterId) {
public KafkaProducer<byte[], byte[]> getProducer(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

if (!this.producers.containsKey(clusterId)) {
this.producers.put(clusterId, new KafkaProducer<>(
this.getProducerProperties(clusterId),
Expand All @@ -134,7 +170,11 @@ public KafkaProducer<byte[], byte[]> getProducer(String clusterId) {
return this.producers.get(clusterId);
}

public AvroSchemaProvider getAvroSchemaProvider(String clusterId) {
public AvroSchemaProvider getAvroSchemaProvider(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider();
avroSchemaProvider.configure(Collections.singletonMap(
"schemaVersionFetcher",
Expand All @@ -143,27 +183,39 @@ public AvroSchemaProvider getAvroSchemaProvider(String clusterId) {
return avroSchemaProvider;
}

public JsonSchemaProvider getJsonSchemaProvider(String clusterId) {
public JsonSchemaProvider getJsonSchemaProvider(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

JsonSchemaProvider jsonSchemaProvider = new JsonSchemaProvider();
jsonSchemaProvider.configure(Collections.singletonMap(
"schemaVersionFetcher",
new CachedSchemaRegistryClient(this.getRegistryRestClient(clusterId), 1000)
));

return jsonSchemaProvider;
return jsonSchemaProvider;
}

public ProtobufSchemaProvider getProtobufSchemaProvider(String clusterId) {
public ProtobufSchemaProvider getProtobufSchemaProvider(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider();
protobufSchemaProvider.configure(Collections.singletonMap(
"schemaVersionFetcher",
new CachedSchemaRegistryClient(this.getRegistryRestClient(clusterId), 1000)
));

return protobufSchemaProvider;
return protobufSchemaProvider;
}

public RestService getRegistryRestClient(String clusterId) {
public RestService getRegistryRestClient(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

Connection connection = this.getConnection(clusterId);

if (connection.getSchemaRegistry() != null) {
Expand All @@ -172,12 +224,16 @@ public RestService getRegistryRestClient(String clusterId) {
);

if (connection.getSchemaRegistry().getProperties() != null
&& !connection.getSchemaRegistry().getProperties().isEmpty()) {
&& !connection.getSchemaRegistry().getProperties().isEmpty()) {

Map<String, Object> sslConfigs =
connection.getSchemaRegistry().getProperties().entrySet().stream()
.filter(e -> e.getKey().startsWith("schema.registry."))
.collect(Collectors.toMap(e -> e.getKey().substring("schema.registry.".length()), Map.Entry::getValue));
connection
.getSchemaRegistry()
.getProperties()
.entrySet()
.stream()
.filter(e -> e.getKey().startsWith("schema.registry."))
.collect(Collectors.toMap(e -> e.getKey().substring("schema.registry.".length()), Map.Entry::getValue));

SslFactory sslFactory = new SslFactory(sslConfigs);
if (sslFactory != null && sslFactory.sslContext() != null) {
Expand Down Expand Up @@ -205,13 +261,18 @@ public RestService getRegistryRestClient(String clusterId) {
}
return restService;
}

return null;
}

private final Map<String, SchemaRegistryClient> registryClient = new HashMap<>();


public SchemaRegistryClient getRegistryClient(String clusterId) {
public SchemaRegistryClient getRegistryClient(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

if (!this.registryClient.containsKey(clusterId)) {
Connection connection = this.getConnection(clusterId);

Expand All @@ -237,36 +298,40 @@ public SchemaRegistryClient getRegistryClient(String clusterId) {

private final Map<String, Map<String, KafkaConnectClient>> connectRestClient = new HashMap<>();

public Map<String, KafkaConnectClient> getConnectRestClient(String clusterId) {
public Map<String, KafkaConnectClient> getConnectRestClient(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException("Invalid cluster '" + clusterId + "'");
}

if (!this.connectRestClient.containsKey(clusterId)) {
Connection connection = this.getConnection(clusterId);

if (connection.getConnect() != null && !connection.getConnect().isEmpty()) {

Map<String,KafkaConnectClient> mapConnects = new HashMap<>();
Map<String, KafkaConnectClient> mapConnects = new HashMap<>();
connection.getConnect().forEach(connect -> {

URIBuilder uri = URIBuilder.fromString(connect.getUrl().toString());
Configuration configuration = new Configuration(uri.toNormalizedURI(false).toString());

if (connect.getBasicAuthUsername() != null) {
configuration.useBasicAuth(
connect.getBasicAuthUsername(),
connect.getBasicAuthPassword()
connect.getBasicAuthUsername(),
connect.getBasicAuthPassword()
);
}

if (connect.getSslTrustStore() != null) {
configuration.useTrustStore(
new File(connect.getSslTrustStore()),
connect.getSslTrustStorePassword()
new File(connect.getSslTrustStore()),
connect.getSslTrustStorePassword()
);
}

if (connect.getSslKeyStore() != null) {
configuration.useKeyStore(
new File(connect.getSslKeyStore()),
connect.getSslKeyStorePassword()
new File(connect.getSslKeyStore()),
connect.getSslKeyStorePassword()
);
}
mapConnects.put(connect.getName(), new KafkaConnectClient(configuration));
Expand Down