From e549af51af030aeb2646a99c3c9bc6a905e71cea Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Thu, 23 May 2024 16:46:18 -0300 Subject: [PATCH] feat: tls and authorization on SPU mirroring (#4022) * feat: tls and authorization on SPU mirroring * test: add util function to create public server with root auth --- Cargo.lock | 1 + crates/fluvio-auth/src/lib.rs | 1 + crates/fluvio-auth/src/root.rs | 51 +++++++++++++++ crates/fluvio-sc/src/init.rs | 3 +- crates/fluvio-sc/src/services/auth/mod.rs | 51 +-------------- .../services/public_api/smartmodule/list.rs | 3 +- crates/fluvio-spu/Cargo.toml | 1 + crates/fluvio-spu/src/config/cli.rs | 2 + .../src/mirroring/home/connection.rs | 59 +++++++++++++++--- .../src/mirroring/remote/controller.rs | 55 +++++++++++++--- .../src/mirroring/test/integration.rs | 10 ++- crates/fluvio-spu/src/services/auth/mod.rs | 37 +++++++++++ crates/fluvio-spu/src/services/mod.rs | 1 + crates/fluvio-spu/src/services/public/mod.rs | 62 ++++++++++++++----- .../src/services/public/tests/mod.rs | 16 ++++- .../src/services/public/tests/produce.rs | 33 +++++----- .../src/services/public/tests/stream_fetch.rs | 43 ++++++------- crates/fluvio-spu/src/start.rs | 41 ++++++------ 18 files changed, 318 insertions(+), 152 deletions(-) create mode 100644 crates/fluvio-auth/src/root.rs create mode 100644 crates/fluvio-spu/src/services/auth/mod.rs diff --git a/Cargo.lock b/Cargo.lock index d900b3374d..37df4f6c72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2963,6 +2963,7 @@ dependencies = [ "event-listener 3.1.0", "flate2", "fluvio", + "fluvio-auth", "fluvio-compression", "fluvio-controlplane", "fluvio-controlplane-metadata", diff --git a/crates/fluvio-auth/src/lib.rs b/crates/fluvio-auth/src/lib.rs index 816fec5996..26cf912036 100644 --- a/crates/fluvio-auth/src/lib.rs +++ b/crates/fluvio-auth/src/lib.rs @@ -1,6 +1,7 @@ mod policy; mod error; +pub mod root; pub mod x509; pub use policy::*; diff --git a/crates/fluvio-auth/src/root.rs b/crates/fluvio-auth/src/root.rs new file mode 100644 index 0000000000..1140e75335 --- /dev/null +++ b/crates/fluvio-auth/src/root.rs @@ -0,0 +1,51 @@ +use async_trait::async_trait; +use fluvio_controlplane_metadata::extended::ObjectType; +use fluvio_socket::FluvioSocket; + +use crate::{AuthContext, AuthError, Authorization, InstanceAction, TypeAction}; + +/// Authorization that allows anything +#[derive(Debug, Clone, Default)] +pub struct RootAuthorization {} + +#[async_trait] +impl Authorization for RootAuthorization { + type Context = RootAuthContext; + + async fn create_auth_context( + &self, + _socket: &mut FluvioSocket, + ) -> Result { + Ok(RootAuthContext {}) + } +} + +impl RootAuthorization { + pub fn new() -> Self { + Self {} + } +} + +#[derive(Debug)] +pub struct RootAuthContext {} + +#[async_trait] +impl AuthContext for RootAuthContext { + async fn allow_type_action( + &self, + _ty: ObjectType, + _action: TypeAction, + ) -> Result { + Ok(true) + } + + /// check if specific instance of spec can be deleted + async fn allow_instance_action( + &self, + _ty: ObjectType, + _action: InstanceAction, + _key: &str, + ) -> Result { + Ok(true) + } +} diff --git a/crates/fluvio-sc/src/init.rs b/crates/fluvio-sc/src/init.rs index 956cba2785..beace189f2 100644 --- a/crates/fluvio-sc/src/init.rs +++ b/crates/fluvio-sc/src/init.rs @@ -122,13 +122,14 @@ where mod pub_server { use std::sync::Arc; + use fluvio_auth::root::RootAuthorization; use tracing::info; use crate::services::start_public_server; use crate::core::SharedContext; use fluvio_controlplane_metadata::core::MetadataItem; - use crate::services::auth::{AuthGlobalContext, RootAuthorization, ReadOnlyAuthorization}; + use crate::services::auth::{AuthGlobalContext, ReadOnlyAuthorization}; use crate::services::auth::basic::{BasicAuthorization, BasicRbacPolicy}; pub fn start(ctx: SharedContext, auth_policy_option: Option) diff --git a/crates/fluvio-sc/src/services/auth/mod.rs b/crates/fluvio-sc/src/services/auth/mod.rs index 1936136dd7..aee5fa1114 100644 --- a/crates/fluvio-sc/src/services/auth/mod.rs +++ b/crates/fluvio-sc/src/services/auth/mod.rs @@ -33,53 +33,6 @@ mod common { } } - /// Authorization that allows anything - /// Used for personal development - #[derive(Debug, Clone)] - pub struct RootAuthorization {} - - #[async_trait] - impl Authorization for RootAuthorization { - type Context = RootAuthContext; - - async fn create_auth_context( - &self, - _socket: &mut FluvioSocket, - ) -> Result { - Ok(RootAuthContext {}) - } - } - - impl RootAuthorization { - pub fn new() -> Self { - Self {} - } - } - - #[derive(Debug)] - pub struct RootAuthContext {} - - #[async_trait] - impl AuthContext for RootAuthContext { - async fn allow_type_action( - &self, - _ty: ObjectType, - _action: TypeAction, - ) -> Result { - Ok(true) - } - - /// check if specific instance of spec can be deleted - async fn allow_instance_action( - &self, - _ty: ObjectType, - _action: InstanceAction, - _key: &str, - ) -> Result { - Ok(true) - } - } - /// Auth Service Context, this hold individual context that is enough enforce auth /// for this service context #[derive(Debug, Clone)] @@ -145,9 +98,9 @@ mod common { #[cfg(test)] mod test { - use fluvio_auth::AuthContext; + use fluvio_auth::{root::RootAuthContext, AuthContext}; - use super::{ObjectType, ReadOnlyAuthContext, RootAuthContext, TypeAction}; + use super::{ObjectType, ReadOnlyAuthContext, TypeAction}; /// test read only context /// read only context allows read on everything diff --git a/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs b/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs index 398f028a38..200dd28ffa 100644 --- a/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs +++ b/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs @@ -90,6 +90,7 @@ mod test { use std::sync::Arc; + use fluvio_auth::root::RootAuthContext; use fluvio_stream_dispatcher::store::StoreContext; use fluvio_stream_model::fixture::TestMeta; use fluvio_stream_model::store::{MetadataStoreObject, LocalStore}; @@ -97,8 +98,6 @@ mod test { SmartModuleSpec, SmartModuleMetadata, SmartModulePackage, FluvioSemVersion, }; - use crate::services::auth::RootAuthContext; - use super::fetch_smart_modules; type TestSmartModuleStore = LocalStore; diff --git a/crates/fluvio-spu/Cargo.toml b/crates/fluvio-spu/Cargo.toml index 3fcfe57035..b7bbdbc824 100644 --- a/crates/fluvio-spu/Cargo.toml +++ b/crates/fluvio-spu/Cargo.toml @@ -50,6 +50,7 @@ mimalloc = { workspace = true } # Fluvio dependencies fluvio = { workspace = true } +fluvio-auth = { workspace = true } fluvio-types = { workspace = true, features = ["events"] } fluvio-storage = { workspace = true, features = ["iterators"] } fluvio-compression = { workspace = true } diff --git a/crates/fluvio-spu/src/config/cli.rs b/crates/fluvio-spu/src/config/cli.rs index 78bfc73b7d..668ba92969 100644 --- a/crates/fluvio-spu/src/config/cli.rs +++ b/crates/fluvio-spu/src/config/cli.rs @@ -8,6 +8,7 @@ use std::io::Error as IoError; use std::process; use std::io::ErrorKind; +use fluvio_future::openssl::SslVerifyMode; use tracing::debug; use tracing::info; use clap::Parser; @@ -178,6 +179,7 @@ impl SpuOpt { .ok_or_else(|| IoError::new(ErrorKind::NotFound, "missing ca cert"))?; TlsAcceptor::builder() .map_err(|err| err.into_io_error())? + .with_ssl_verify_mode(SslVerifyMode::PEER) .with_ca_from_pem_file(ca_path) .map_err(|err| err.into_io_error())? } else { diff --git a/crates/fluvio-spu/src/mirroring/home/connection.rs b/crates/fluvio-spu/src/mirroring/home/connection.rs index a0f65aa2b4..67193744ab 100644 --- a/crates/fluvio-spu/src/mirroring/home/connection.rs +++ b/crates/fluvio-spu/src/mirroring/home/connection.rs @@ -6,17 +6,21 @@ use tokio::select; use tracing::{debug, error, instrument, warn}; use anyhow::Result; +use fluvio_auth::{AuthContext, InstanceAction}; +use fluvio_controlplane_metadata::mirror::MirrorType; +use fluvio_controlplane_metadata::extended::ObjectType; use fluvio_future::timer::sleep; use fluvio_protocol::api::RequestMessage; use fluvio_spu_schema::server::mirror::StartMirrorRequest; use futures_util::StreamExt; -use fluvio_socket::{FluvioStream, ExclusiveFlvSink}; +use fluvio_socket::{ExclusiveFlvSink, FluvioStream}; use crate::core::DefaultSharedGlobalContext; use crate::mirroring::remote::api_key::MirrorRemoteApiEnum; use crate::mirroring::remote::remote_api::RemoteMirrorRequest; use crate::mirroring::remote::sync::DefaultPartitionSyncRequest; use crate::replication::leader::SharedFileLeaderState; +use crate::services::auth::SpuAuthServiceContext; use super::update_offsets::UpdateHomeOffsetRequest; @@ -59,18 +63,55 @@ impl fmt::Debug for MirrorHomeHandler { impl MirrorHomeHandler { /// start handling mirror request sync from remote /// it is called from public service handler - pub(crate) async fn respond( - ctx: DefaultSharedGlobalContext, + pub(crate) async fn respond( req_msg: RequestMessage, + auth_ctx: &SpuAuthServiceContext, sink: ExclusiveFlvSink, - stream: &mut FluvioStream, + stream: FluvioStream, ) { + // authorization check + if let Ok(authorized) = auth_ctx + .auth + .allow_instance_action( + ObjectType::Mirror, + InstanceAction::Update, + &req_msg.request.remote_cluster_id, + ) + .await + { + if !authorized { + warn!( + "identity mismatch for remote_id: {}", + req_msg.request.remote_cluster_id + ); + return; + } + } + + // check if remote cluster exists + let mirrors = auth_ctx.global_ctx.mirrors_localstore().all_values(); + let remote = mirrors + .iter() + .find(|mirror| match &mirror.spec.mirror_type { + MirrorType::Remote(r) => r.id == req_msg.request.remote_cluster_id, + _ => false, + }); + + if remote.is_none() { + warn!( + "remote cluster not found: {}", + req_msg.request.remote_cluster_id + ); + return; + } + debug!("handling mirror request: {:#?}", req_msg); let remote_replica = req_msg.request.remote_replica; let remote_cluster_id = req_msg.request.remote_cluster_id; let _access_key = req_msg.request.access_key; - if let Some(leader) = ctx + if let Some(leader) = auth_ctx + .global_ctx .leaders_state() .find_mirror_home_leader(&remote_cluster_id, &remote_replica) .await @@ -79,21 +120,19 @@ impl MirrorHomeHandler { // map to actual home let metrics = Arc::new(MirrorRequestMetrics::new()); - // TODO: perform authorization let handler: MirrorHomeHandler = Self { metrics: metrics.clone(), leader, - ctx, + ctx: auth_ctx.global_ctx.clone(), }; if let Err(err) = handler.inner_respond(sink, stream).await { error!("error handling mirror request: {:#?}", err); } } else { - // TODO: handle no home partition warn!( remote_replica, - remote_cluster_id, "no leader replica found for this" + remote_cluster_id, "no leader replica found for this mirror request" ); } } @@ -102,7 +141,7 @@ impl MirrorHomeHandler { async fn inner_respond( self, mut sink: ExclusiveFlvSink, - stream: &mut FluvioStream, + mut stream: FluvioStream, ) -> Result<()> { // first send let mut api_stream = stream.api_stream::(); diff --git a/crates/fluvio-spu/src/mirroring/remote/controller.rs b/crates/fluvio-spu/src/mirroring/remote/controller.rs index 9e5e734755..b15172760b 100644 --- a/crates/fluvio-spu/src/mirroring/remote/controller.rs +++ b/crates/fluvio-spu/src/mirroring/remote/controller.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use fluvio::config::TlsPolicy; use futures_util::StreamExt; use tokio::select; use tracing::{debug, error, warn, instrument}; @@ -23,7 +24,7 @@ use fluvio_storage::{ReplicaStorage, FileReplica}; use fluvio_socket::{FluvioSocket, FluvioSink}; use fluvio_spu_schema::{Isolation, server::mirror::StartMirrorRequest}; -use fluvio_future::{task::spawn, timer::sleep}; +use fluvio_future::{net::DomainConnector, task::spawn, timer::sleep}; use fluvio_protocol::{record::Offset, api::RequestMessage}; use fluvio_types::event::offsets::OffsetChangeListener; @@ -211,7 +212,7 @@ where (home_socket, tls): (FluvioSocket, bool), backoff: &mut ExponentialBackoff, ) -> Result<()> { - // debug!(home = self.home, "start syncing mirror"); + debug!(home_id = home.id, "start syncing mirror"); let (mut home_sink, mut home_stream) = home_socket.split(); @@ -220,7 +221,6 @@ where home_sink.disable_zerocopy(); } - // let mut home_api_stream = home_stream.api_stream::(); self.send_initial_request(home, &mut home_sink).await?; @@ -352,7 +352,7 @@ where } /// look up home cluster from local store - /// this may retur None if remote cluster is send by SC by time controller is started + /// this may return None if remote cluster is send by SC by time controller is started fn find_home_cluster(&self) -> Option { let read = self.mirror_store.read(); let mirror = read.get(&self.remote_config.home_cluster).cloned(); @@ -424,13 +424,14 @@ where } /// create socket to home, this will always succeed - #[instrument] + #[instrument(skip(self, home))] async fn create_socket_to_home( &self, backoff: &mut ExponentialBackoff, - _home: &Home, + home: &Home, ) -> (FluvioSocket, bool) { - //TODO: implement tls + let tlspolicy = option_tlspolicy(home); + loop { self.state.metrics.increase_conn_count(); @@ -441,12 +442,28 @@ where "trying connect to home", ); - let res = FluvioSocket::connect(endpoint).await; + let res = if let Some(tlspolicy) = &tlspolicy { + match DomainConnector::try_from(tlspolicy.clone()) { + Ok(connector) => { + FluvioSocket::connect_with_connector(endpoint, &(*connector)).await + } + Err(err) => { + error!( + "error establishing tls with leader at: <{}> err: {}", + endpoint, err + ); + self.backoff_and_wait(backoff).await; + continue; + } + } + } else { + FluvioSocket::connect(endpoint).await + }; match res { Ok(socket) => { debug!("connected"); - return (socket, false); + return (socket, tlspolicy.is_some()); } Err(err) => { @@ -473,3 +490,23 @@ fn create_backoff() -> ExponentialBackoff { .build() .unwrap() } + +fn option_tlspolicy(home: &Home) -> Option { + use fluvio::config::{TlsCerts, TlsConfig}; + + let ct = match &home.client_tls { + Some(ct) => ct, + _ => { + return None; + } + }; + + let certs = TlsCerts { + domain: ct.domain.clone(), + key: ct.client_key.clone(), + cert: ct.client_cert.clone(), + ca_cert: ct.ca_cert.clone(), + }; + let tlscfg = TlsConfig::Inline(certs); + Some(TlsPolicy::from(tlscfg)) +} diff --git a/crates/fluvio-spu/src/mirroring/test/integration.rs b/crates/fluvio-spu/src/mirroring/test/integration.rs index 2fcf7a928a..22db973c7d 100644 --- a/crates/fluvio-spu/src/mirroring/test/integration.rs +++ b/crates/fluvio-spu/src/mirroring/test/integration.rs @@ -1,12 +1,13 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; +use fluvio_auth::root::RootAuthorization; use tracing::debug; use fluvio_controlplane_metadata::partition::{RemotePartitionConfig, HomePartitionConfig}; use fluvio_future::timer::sleep; use fluvio_protocol::{fixture::create_raw_recordset, record::ReplicaKey}; -use crate::services::public::create_public_server; +use crate::services::{auth::SpuAuthGlobalContext, public::create_public_server}; use super::fixture::{ReplicaConfig, local_port}; @@ -64,7 +65,10 @@ async fn test_mirroring_new_records() { // start home server debug!("starting home server"); - let _remote_end = create_public_server(home_port.clone(), home_gctx.clone()).run(); + + let auth_global_ctx = + SpuAuthGlobalContext::new(home_gctx.clone(), Arc::new(RootAuthorization::new())); + let _remote_end = create_public_server(home_port.to_owned(), auth_global_ctx.clone()).run(); // sleep 1 seconds debug!("waiting for home public server to up"); diff --git a/crates/fluvio-spu/src/services/auth/mod.rs b/crates/fluvio-spu/src/services/auth/mod.rs new file mode 100644 index 0000000000..1cf6476800 --- /dev/null +++ b/crates/fluvio-spu/src/services/auth/mod.rs @@ -0,0 +1,37 @@ +pub use common::*; + +mod common { + + use std::sync::Arc; + use std::fmt::Debug; + + use crate::core::DefaultSharedGlobalContext; + + /// SPU global context with authorization + /// auth is trait object which contains global auth auth policy + #[derive(Clone, Debug)] + pub struct SpuAuthGlobalContext { + pub global_ctx: DefaultSharedGlobalContext, + pub auth: Arc, + } + + impl SpuAuthGlobalContext { + pub fn new(global_ctx: DefaultSharedGlobalContext, auth: Arc) -> Self { + Self { global_ctx, auth } + } + } + + /// Auth Service Context, this hold individual context that is enough enforce auth + /// for this service context + #[derive(Debug, Clone)] + pub struct SpuAuthServiceContext { + pub global_ctx: DefaultSharedGlobalContext, + pub auth: AC, + } + + impl SpuAuthServiceContext { + pub fn new(global_ctx: DefaultSharedGlobalContext, auth: AC) -> Self { + Self { global_ctx, auth } + } + } +} diff --git a/crates/fluvio-spu/src/services/mod.rs b/crates/fluvio-spu/src/services/mod.rs index 70642db4d4..fbda948ebf 100644 --- a/crates/fluvio-spu/src/services/mod.rs +++ b/crates/fluvio-spu/src/services/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod public; +pub mod auth; pub mod internal; pub use self::internal::create_internal_server; diff --git a/crates/fluvio-spu/src/services/public/mod.rs b/crates/fluvio-spu/src/services/public/mod.rs index b0f3f1ea81..ae3b1c1609 100644 --- a/crates/fluvio-spu/src/services/public/mod.rs +++ b/crates/fluvio-spu/src/services/public/mod.rs @@ -12,6 +12,7 @@ mod conn_context; use std::sync::Arc; use async_trait::async_trait; +use fluvio_auth::Authorization; use fluvio_protocol::api::Request; use fluvio_protocol::api::RequestMessage; use fluvio_protocol::link::ErrorCode; @@ -29,6 +30,8 @@ use fluvio_types::event::StickyEvent; use crate::core::DefaultSharedGlobalContext; use crate::mirroring::home::connection::MirrorHomeHandler; +use crate::services::auth::SpuAuthGlobalContext; +use crate::services::auth::SpuAuthServiceContext; use crate::services::public::consumer_handler::handle_delete_consumer_offset_request; use crate::services::public::consumer_handler::handle_fetch_consumer_offsets_request; use crate::services::public::consumer_handler::handle_update_consumer_offset_request; @@ -39,54 +42,79 @@ use self::offset_request::handle_offset_request; use self::offset_update::handle_offset_update; use self::stream_fetch::{StreamFetchHandler, publishers::StreamPublishers}; use self::conn_context::ConnectionContext; +use std::fmt::Debug; -pub(crate) type SpuPublicServer = - FluvioApiServer; +pub(crate) type SpuPublicServer = + FluvioApiServer, PublicService>; -pub fn create_public_server(addr: String, ctx: DefaultSharedGlobalContext) -> SpuPublicServer { +pub fn create_public_server( + addr: String, + auth_ctx: SpuAuthGlobalContext, +) -> SpuPublicServer +where + A: Authorization + Sync + Send + Debug + 'static, + SpuAuthGlobalContext: Clone + Debug, + ::Context: Send + Sync, +{ info!( - spu_id = ctx.local_spu_id(), + spu_id = auth_ctx.global_ctx.local_spu_id(), %addr, "Starting SPU public service:", ); - FluvioApiServer::new(addr, ctx, PublicService::new()) + FluvioApiServer::new(addr, auth_ctx, PublicService::::new()) } #[derive(Debug)] -pub struct PublicService { - _0: (), // Prevent construction +pub struct PublicService { + data: std::marker::PhantomData, } -impl PublicService { +impl PublicService { pub fn new() -> Self { - PublicService { _0: () } + PublicService { + data: std::marker::PhantomData, + } } } #[async_trait] -impl FluvioService for PublicService { +impl FluvioService for PublicService +where + A: Authorization + Send + Sync, + ::Context: Send + Sync, +{ type Request = SpuServerRequest; - type Context = DefaultSharedGlobalContext; + type Context = SpuAuthGlobalContext; #[instrument(skip(self, context))] async fn respond( self: Arc, - context: DefaultSharedGlobalContext, - socket: FluvioSocket, + context: Self::Context, + mut socket: FluvioSocket, _connection: ConnectInfo, ) -> Result<()> { - let (sink, mut stream) = socket.split(); - + let auth_context = context + .auth + .create_auth_context(&mut socket) + .await + .map_err(|err| { + let io_error: std::io::Error = err.into(); + io_error + })?; + let service_context = SpuAuthServiceContext::new(context.global_ctx.clone(), auth_context); let mut mirror_request: Option> = None; let shutdown = StickyEvent::shared(); - + let (sink, mut stream) = socket.split(); let mut shared_sink = sink.as_shared(); + { let api_stream = stream.api_stream::(); let mut event_stream = api_stream.take_until(shutdown.listen_pinned()); let mut conn_ctx = ConnectionContext::new(); + let context = &context.global_ctx; + loop { let event = event_stream.next().await; match event { @@ -188,7 +216,7 @@ impl FluvioService for PublicService { } if let Some(request) = mirror_request { - MirrorHomeHandler::respond(context, request, shared_sink, &mut stream).await; + MirrorHomeHandler::respond(request, &service_context, shared_sink, stream).await; } shutdown.notify(); diff --git a/crates/fluvio-spu/src/services/public/tests/mod.rs b/crates/fluvio-spu/src/services/public/tests/mod.rs index 3f6d9bf074..59ad37295f 100644 --- a/crates/fluvio-spu/src/services/public/tests/mod.rs +++ b/crates/fluvio-spu/src/services/public/tests/mod.rs @@ -5,6 +5,7 @@ use std::{ use chrono::Utc; use flate2::{bufread::GzEncoder, Compression}; +use fluvio_auth::root::RootAuthorization; use fluvio_controlplane::spu_api::update_smartmodule::SmartModule; use fluvio_controlplane_metadata::smartmodule::{ SmartModuleSpec, SmartModuleWasm, SmartModuleWasmFormat, @@ -14,9 +15,11 @@ use fluvio_protocol::{ record::{RecordData, Record, RecordSet, Batch, RawRecords}, ByteBuf, }; -use fluvio_storage::ReplicaStorage; +use fluvio_storage::{FileReplica, ReplicaStorage}; -use crate::core::GlobalContext; +use crate::{core::GlobalContext, services::auth::SpuAuthGlobalContext}; + +use super::{create_public_server, SpuPublicServer}; mod stream_fetch; mod produce; @@ -109,3 +112,12 @@ fn load_wasm_module(ctx: &GlobalContext, module_name: &str }, }); } + +fn create_public_server_with_root_auth( + addr: String, + ctx: Arc>, +) -> SpuPublicServer { + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + create_public_server(addr.to_owned(), auth_global_ctx.clone()) +} diff --git a/crates/fluvio-spu/src/services/public/tests/produce.rs b/crates/fluvio-spu/src/services/public/tests/produce.rs index 4189461a45..a451795882 100644 --- a/crates/fluvio-spu/src/services/public/tests/produce.rs +++ b/crates/fluvio-spu/src/services/public/tests/produce.rs @@ -27,13 +27,11 @@ use flv_util::fixture::ensure_clean_dir; use crate::{ config::SpuConfig, core::GlobalContext, - services::public::{ - create_public_server, - tests::{ - create_filter_records, vec_to_raw_batch, load_wasm_module, create_filter_raw_records, - }, - }, replication::leader::LeaderReplicaState, + services::public::tests::{ + create_filter_raw_records, create_filter_records, create_public_server_with_root_auth, + load_wasm_module, vec_to_raw_batch, + }, }; #[fluvio_future::test(ignore)] @@ -47,7 +45,7 @@ async fn test_produce_basic() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -160,7 +158,7 @@ async fn test_produce_invalid_compression() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -234,7 +232,8 @@ async fn test_produce_request_timed_out() { let (leader_ctx, _) = config.leader_replica().await; - let server_end_event = create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + let server_end_event = + create_public_server_with_root_auth(public_addr.to_owned(), leader_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -296,7 +295,8 @@ async fn test_produce_not_waiting_replication() { let (leader_ctx, _) = config.leader_replica().await; let public_addr = config.leader_public_addr(); - let server_end_event = create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + let server_end_event = + create_public_server_with_root_auth(public_addr.to_owned(), leader_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -356,7 +356,8 @@ async fn test_produce_waiting_replication() { let public_addr = config.leader_public_addr(); let public_server_end_event = - create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + create_public_server_with_root_auth(public_addr.to_owned(), leader_ctx.clone()).run(); + let private_server_end_event = create_internal_server(config.leader_addr(), leader_ctx.clone()).run(); @@ -425,7 +426,7 @@ async fn test_produce_metrics() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -537,7 +538,7 @@ async fn test_produce_basic_with_smartmodule_with_lookback() { smartmodule.params.set_lookback(Some(Lookback::last(1))); let mut smartmodules = vec![smartmodule]; - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -848,7 +849,7 @@ async fn test_produce_with_deduplication() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1038,7 +1039,7 @@ async fn test_produce_smart_engine_memory_overfow() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1124,7 +1125,7 @@ async fn test_dedup_init_smart_engine_memory_overfow() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; diff --git a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs index 311d985fba..9f182f9d19 100644 --- a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs +++ b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs @@ -34,14 +34,15 @@ use fluvio_spu_schema::{ fetch::DefaultFetchRequest, }; use fluvio_spu_schema::server::stream_fetch::DefaultStreamFetchRequest; -use crate::services::public::tests::{vec_to_batch, create_filter_raw_records}; +use crate::services::public::tests::{ + create_filter_raw_records, create_public_server_with_root_auth, vec_to_batch, +}; use crate::{ core::GlobalContext, services::public::tests::{create_filter_records, vec_to_raw_batch}, }; use crate::config::SpuConfig; use crate::replication::leader::LeaderReplicaState; -use crate::services::public::create_public_server; use fluvio_protocol::{api::RequestMessage, record::RecordSet}; @@ -58,7 +59,7 @@ async fn test_stream_fetch_basic() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -394,7 +395,7 @@ async fn test_stream_fetch_filter( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -585,7 +586,7 @@ async fn test_stream_fetch_filter_individual( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -711,7 +712,7 @@ async fn test_stream_filter_error_fetch( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -846,7 +847,7 @@ async fn test_stream_filter_max( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -992,7 +993,7 @@ async fn test_stream_fetch_map( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1147,7 +1148,7 @@ async fn test_stream_fetch_map_chain( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1307,7 +1308,7 @@ async fn test_stream_fetch_map_error( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1445,7 +1446,7 @@ async fn test_stream_aggregate_fetch_single_batch( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1591,7 +1592,7 @@ async fn test_stream_aggregate_fetch_multiple_batch( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1729,7 +1730,7 @@ async fn test_stream_fetch_and_new_request( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1818,7 +1819,7 @@ async fn test_stream_fetch_array_map( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1940,7 +1941,7 @@ async fn test_stream_fetch_filter_map( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2065,7 +2066,7 @@ async fn test_stream_fetch_filter_with_params( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2257,7 +2258,7 @@ async fn test_stream_fetch_invalid_smartmodule( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2316,7 +2317,7 @@ async fn test_stream_metrics() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2494,7 +2495,7 @@ async fn stream_fetch_filter_lookback( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2655,7 +2656,7 @@ async fn stream_fetch_filter_lookback_age( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -3037,7 +3038,7 @@ async fn test_stream_fetch_sends_topic_delete_error_on_topic_delete() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let server_end_event = create_public_server_with_root_auth(addr.to_owned(), ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; diff --git a/crates/fluvio-spu/src/start.rs b/crates/fluvio-spu/src/start.rs index 3b6e9dd470..08c66360c1 100644 --- a/crates/fluvio-spu/src/start.rs +++ b/crates/fluvio-spu/src/start.rs @@ -1,9 +1,12 @@ +use std::sync::Arc; + +use fluvio_auth::root::RootAuthorization; use fluvio_storage::FileReplica; use crate::config::{SpuConfig, SpuOpt}; +use crate::services::auth::SpuAuthGlobalContext; use crate::services::create_internal_server; -use crate::services::internal::InternalApiServer; -use crate::services::public::{SpuPublicServer, create_public_server}; +use crate::services::public::create_public_server; use crate::core::DefaultSharedGlobalContext; use crate::core::GlobalContext; use crate::control_plane::ScDispatcher; @@ -41,10 +44,7 @@ pub fn main_loop(opt: SpuOpt) { info!(uptime = sys.uptime(), "Uptime in secs"); run_block_on(async move { - let (ctx, internal_server, public_server) = create_services(spu_config.clone(), true, true); - - let _public_shutdown = internal_server.unwrap().run(); - let _private_shutdown = public_server.unwrap().run(); + let ctx = create_services(spu_config.clone(), true, true); init_monitoring(ctx); @@ -66,50 +66,47 @@ pub fn create_services( local_spu: SpuConfig, internal: bool, public: bool, -) -> ( - DefaultSharedGlobalContext, - Option, - Option, -) { +) -> DefaultSharedGlobalContext { let ctx = FileReplicaContext::new_shared_context(local_spu); let public_ep_addr = ctx.config().public_socket_addr().to_owned(); let private_ep_addr = ctx.config().private_socket_addr().to_owned(); - let public_server = if public { - Some(create_public_server(public_ep_addr, ctx.clone())) - } else { - None + if public { + let authorization = Arc::new(RootAuthorization::new()); + let auth_global_ctx = SpuAuthGlobalContext::new(ctx.clone(), authorization); + let pub_server = create_public_server(public_ep_addr, auth_global_ctx); + pub_server.run(); }; - let internal_server = if internal { - Some(create_internal_server(private_ep_addr, ctx.clone())) - } else { - None + if internal { + let priv_server = create_internal_server(private_ep_addr, ctx.clone()); + priv_server.run(); }; let sc_dispatcher = ScDispatcher::new(ctx.clone()); sc_dispatcher.run(); - (ctx, internal_server, public_server) + ctx } mod proxy { use std::process; - use tracing::info; use flv_util::print_cli_err; use fluvio_future::openssl::TlsAcceptor; - use crate::config::SpuConfig; use flv_tls_proxy::start as proxy_start; + use crate::config::SpuConfig; + pub async fn start_proxy(config: SpuConfig, acceptor: (TlsAcceptor, String)) { let (tls_acceptor, proxy_addr) = acceptor; let target = config.public_endpoint; info!("starting TLS proxy: {}", proxy_addr); + //TODO: add X509Authenticator if let Err(err) = proxy_start(&proxy_addr, tls_acceptor, target).await { print_cli_err!(err); process::exit(-1);