Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.3.x] kafka: Fixed segfault issue with auditing and mTLS #23289

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ ss::future<> connection_context::start() {
}

ss::future<> connection_context::stop() {
if (_hook) {
if (_hook && is_linked()) {
_hook.value().get().erase(_hook.value().get().iterator_to(*this));
}
if (conn) {
Expand Down
5 changes: 4 additions & 1 deletion src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ ss::future<> server::apply(ss::lw_shared_ptr<net::connection> conn) {

std::exception_ptr eptr;
try {
co_await ctx->start();
// Must call start() to ensure `ctx` is inserted into the `_connections`
// list. Otherwise if enqueing the audit message fails and `stop()` is
// called, this will result in a segfault.
if (authn_method == config::broker_authn_method::mtls_identity) {
auto authn_event = make_auth_event_options(mtls_state.value(), ctx);
if (!ctx->server().audit_mgr().enqueue_authn_event(
Expand All @@ -343,7 +347,6 @@ ss::future<> server::apply(ss::lw_shared_ptr<net::connection> conn) {
"system error");
}
}
co_await ctx->start();
co_await ctx->process();
} catch (...) {
eptr = std::current_exception();
Expand Down
4 changes: 3 additions & 1 deletion src/v/security/audit/audit_log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ ss::future<> audit_client::update_status(kafka::error_code errc) {
}
} else if (_last_errc == kafka::error_code::illegal_sasl_state) {
/// The status changed from erraneous to anything else
if (errc != kafka::error_code::illegal_sasl_state) {
if (
errc != kafka::error_code::illegal_sasl_state
&& errc != kafka::error_code::broker_not_available) {
co_await _sink->update_auth_status(
audit_sink::auth_misconfigured_t::no);
}
Expand Down
129 changes: 129 additions & 0 deletions tests/rptest/tests/audit_log_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ def change_max_buffer_size_per_shard(self, new_size: int):
self._modify_cluster_config(
{'audit_queue_max_buffer_size_per_shard': new_size})

def modify_audit_enabled(self, enabled: bool):
"""
Modifies value of audit_enabled
"""
self._modify_cluster_config({'audit_enabled': enabled})

def modify_node_config(self, node, update_fn, skip_readiness_check=True):
"""Modifies the current node configuration, restarts the node for
changes to take effect
Expand Down Expand Up @@ -1584,6 +1590,129 @@ def test_no_audit_user_authn(self):
pass


class AuditLogTestInvalidConfigBase(AuditLogTestBase):
username = 'test'
password = 'test12345'
algorithm = 'SCRAM-SHA-256'
"""
Tests situations where audit log client is not properly configured
"""
def __init__(self,
test_context,
audit_log_config=AuditLogConfig(enabled=False,
num_partitions=1,
event_types=[]),
log_config=LoggingConfig('info',
logger_levels={
'auditing': 'trace',
'kafka': 'trace',
'security': 'trace'
}),
**kwargs):
self.test_context = test_context
# The 'none' below will cause the audit log client to not be configured properly
self._audit_log_client_config = redpanda.AuditLogConfig(
listener_port=9192, listener_authn_method='none')
self._audit_log_client_config.require_client_auth = False
self._audit_log_client_config.enable_broker_tls = False

super(AuditLogTestInvalidConfigBase, self).__init__(
test_context=test_context,
audit_log_config=audit_log_config,
log_config=log_config,
audit_log_client_config=self._audit_log_client_config,
**kwargs)

def setUp(self):
super().setUp()
self.admin.create_user(self.username, self.password, self.algorithm)
self.get_super_rpk().acl_create_allow_cluster(self.username, 'All')
# Following is important so the rest of ducktape functions correctly
self.modify_audit_excluded_principals(['admin'])
self.modify_audit_event_types(['authenticate'])
self.modify_audit_enabled(True)
# Waits for all audit clients to enter the same state where any attempt
# to enqueue an event will be rejected because the client is misconfigured
wait_until(lambda: self.redpanda.search_log_all(
'error_code: illegal_sasl_state'),
timeout_sec=30,
backoff_sec=2,
err_msg="Did not see illegal_sasl_state error message")


class AuditLogTestInvalidConfig(AuditLogTestInvalidConfigBase):
def __init__(self, test_context):
super(AuditLogTestInvalidConfig, self).__init__(
test_context=test_context,
security=AuditLogTestSecurityConfig(user_creds=(self.username,
self.password,
self.algorithm)))

@cluster(num_nodes=4,
log_allow_list=[
r'Failed to append authentication event to audit log',
r'Failed to audit.*'
])
def test_invalid_config(self):
"""
Test validates that the topic is failed to get created if audit
system is not configured correctly.
"""
try:
self.get_rpk().create_topic('test')
assert False, "Should not have created a topic"
except RpkException as e:
assert "audit system failure: BROKER_NOT_AVAILABLE" in str(
e
), f'{str(e)} does not contain "audit system failure: BROKER_NOT_AVAILABLE"'


class AuditLogTestInvalidConfigMTLS(AuditLogTestInvalidConfigBase):
"""
Tests situations where audit log client is not properly configured and mTLS enabled
"""
def __init__(self, test_context):
self.test_context = test_context
self.tls = tls.TLSCertManager(self.logger)
self.user_cert = self.tls.create_cert(socket.gethostname(),
common_name=self.username,
name='base_client')
self.admin_user_cert = self.tls.create_cert(
socket.gethostname(),
common_name=RedpandaServiceBase.SUPERUSER_CREDENTIALS[0],
name='admin_client')
self._security_config = AuditLogTestSecurityConfig(
admin_cert=self.admin_user_cert, user_cert=self.user_cert)
self._security_config.tls_provider = MTLSProvider(self.tls)
self._security_config.principal_mapping_rules = 'RULE:.*CN=(.*).*/$1/'

super(AuditLogTestInvalidConfigMTLS,
self).__init__(test_context=test_context,
security=self._security_config)

@cluster(
num_nodes=4,
log_allow_list=[
r'Failed to append authentication event to audit log',
r'Failed to audit.*',
r'Failed to enqueue mTLS authentication event - audit log system error'
])
def test_invalid_config_mtls(self):
"""
Validates that mTLS authn is rejected when audit client is misconfigured.
Also ensures there is no segfault: https://redpandadata.atlassian.net/browse/CORE-7245
"""
try:
self.get_rpk().create_topic('test')
assert False, "Should not have created a topic"
except RpkException as e:
pass

assert self.redpanda.search_log_any(
'Failed to enqueue mTLS authentication event - audit log system error'
)


class AuditLogTestKafkaTlsApi(AuditLogTestBase):
"""
Tests that validate audit log messages for users authenticated via mTLS
Expand Down