Skip to content

Commit

Permalink
feat: tls and authorization on SPU mirroring (#4022)
Browse files Browse the repository at this point in the history
* feat: tls and authorization on SPU mirroring

* test: add util function to create public server with root auth
  • Loading branch information
fraidev committed May 23, 2024
1 parent dc47cf7 commit e549af5
Show file tree
Hide file tree
Showing 18 changed files with 318 additions and 152 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/fluvio-auth/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod policy;
mod error;

pub mod root;
pub mod x509;

pub use policy::*;
Expand Down
51 changes: 51 additions & 0 deletions crates/fluvio-auth/src/root.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use async_trait::async_trait;
use fluvio_controlplane_metadata::extended::ObjectType;
use fluvio_socket::FluvioSocket;

use crate::{AuthContext, AuthError, Authorization, InstanceAction, TypeAction};

/// Authorization that allows anything
#[derive(Debug, Clone, Default)]
pub struct RootAuthorization {}

#[async_trait]
impl Authorization for RootAuthorization {
type Context = RootAuthContext;

async fn create_auth_context(
&self,
_socket: &mut FluvioSocket,
) -> Result<Self::Context, AuthError> {
Ok(RootAuthContext {})
}
}

impl RootAuthorization {
pub fn new() -> Self {
Self {}
}
}

#[derive(Debug)]
pub struct RootAuthContext {}

#[async_trait]
impl AuthContext for RootAuthContext {
async fn allow_type_action(
&self,
_ty: ObjectType,
_action: TypeAction,
) -> Result<bool, AuthError> {
Ok(true)
}

/// check if specific instance of spec can be deleted
async fn allow_instance_action(
&self,
_ty: ObjectType,
_action: InstanceAction,
_key: &str,
) -> Result<bool, AuthError> {
Ok(true)
}
}
3 changes: 2 additions & 1 deletion crates/fluvio-sc/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ where
mod pub_server {

use std::sync::Arc;
use fluvio_auth::root::RootAuthorization;
use tracing::info;

use crate::services::start_public_server;
use crate::core::SharedContext;

use fluvio_controlplane_metadata::core::MetadataItem;
use crate::services::auth::{AuthGlobalContext, RootAuthorization, ReadOnlyAuthorization};
use crate::services::auth::{AuthGlobalContext, ReadOnlyAuthorization};
use crate::services::auth::basic::{BasicAuthorization, BasicRbacPolicy};

pub fn start<C>(ctx: SharedContext<C>, auth_policy_option: Option<BasicRbacPolicy>)
Expand Down
51 changes: 2 additions & 49 deletions crates/fluvio-sc/src/services/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,53 +33,6 @@ mod common {
}
}

/// Authorization that allows anything
/// Used for personal development
#[derive(Debug, Clone)]
pub struct RootAuthorization {}

#[async_trait]
impl Authorization for RootAuthorization {
type Context = RootAuthContext;

async fn create_auth_context(
&self,
_socket: &mut FluvioSocket,
) -> Result<Self::Context, AuthError> {
Ok(RootAuthContext {})
}
}

impl RootAuthorization {
pub fn new() -> Self {
Self {}
}
}

#[derive(Debug)]
pub struct RootAuthContext {}

#[async_trait]
impl AuthContext for RootAuthContext {
async fn allow_type_action(
&self,
_ty: ObjectType,
_action: TypeAction,
) -> Result<bool, AuthError> {
Ok(true)
}

/// check if specific instance of spec can be deleted
async fn allow_instance_action(
&self,
_ty: ObjectType,
_action: InstanceAction,
_key: &str,
) -> Result<bool, AuthError> {
Ok(true)
}
}

/// Auth Service Context, this hold individual context that is enough enforce auth
/// for this service context
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -145,9 +98,9 @@ mod common {

#[cfg(test)]
mod test {
use fluvio_auth::AuthContext;
use fluvio_auth::{root::RootAuthContext, AuthContext};

use super::{ObjectType, ReadOnlyAuthContext, RootAuthContext, TypeAction};
use super::{ObjectType, ReadOnlyAuthContext, TypeAction};

/// test read only context
/// read only context allows read on everything
Expand Down
3 changes: 1 addition & 2 deletions crates/fluvio-sc/src/services/public_api/smartmodule/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,14 @@ mod test {

use std::sync::Arc;

use fluvio_auth::root::RootAuthContext;
use fluvio_stream_dispatcher::store::StoreContext;
use fluvio_stream_model::fixture::TestMeta;
use fluvio_stream_model::store::{MetadataStoreObject, LocalStore};
use fluvio_controlplane_metadata::smartmodule::{
SmartModuleSpec, SmartModuleMetadata, SmartModulePackage, FluvioSemVersion,
};

use crate::services::auth::RootAuthContext;

use super::fetch_smart_modules;

type TestSmartModuleStore = LocalStore<SmartModuleSpec, TestMeta>;
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-spu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mimalloc = { workspace = true }

# Fluvio dependencies
fluvio = { workspace = true }
fluvio-auth = { workspace = true }
fluvio-types = { workspace = true, features = ["events"] }
fluvio-storage = { workspace = true, features = ["iterators"] }
fluvio-compression = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-spu/src/config/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::io::Error as IoError;
use std::process;
use std::io::ErrorKind;

use fluvio_future::openssl::SslVerifyMode;
use tracing::debug;
use tracing::info;
use clap::Parser;
Expand Down Expand Up @@ -178,6 +179,7 @@ impl SpuOpt {
.ok_or_else(|| IoError::new(ErrorKind::NotFound, "missing ca cert"))?;
TlsAcceptor::builder()
.map_err(|err| err.into_io_error())?
.with_ssl_verify_mode(SslVerifyMode::PEER)
.with_ca_from_pem_file(ca_path)
.map_err(|err| err.into_io_error())?
} else {
Expand Down
59 changes: 49 additions & 10 deletions crates/fluvio-spu/src/mirroring/home/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ use tokio::select;
use tracing::{debug, error, instrument, warn};
use anyhow::Result;

use fluvio_auth::{AuthContext, InstanceAction};
use fluvio_controlplane_metadata::mirror::MirrorType;
use fluvio_controlplane_metadata::extended::ObjectType;
use fluvio_future::timer::sleep;
use fluvio_protocol::api::RequestMessage;
use fluvio_spu_schema::server::mirror::StartMirrorRequest;
use futures_util::StreamExt;
use fluvio_socket::{FluvioStream, ExclusiveFlvSink};
use fluvio_socket::{ExclusiveFlvSink, FluvioStream};

use crate::core::DefaultSharedGlobalContext;
use crate::mirroring::remote::api_key::MirrorRemoteApiEnum;
use crate::mirroring::remote::remote_api::RemoteMirrorRequest;
use crate::mirroring::remote::sync::DefaultPartitionSyncRequest;
use crate::replication::leader::SharedFileLeaderState;
use crate::services::auth::SpuAuthServiceContext;

use super::update_offsets::UpdateHomeOffsetRequest;

Expand Down Expand Up @@ -59,18 +63,55 @@ impl fmt::Debug for MirrorHomeHandler {
impl MirrorHomeHandler {
/// start handling mirror request sync from remote
/// it is called from public service handler
pub(crate) async fn respond(
ctx: DefaultSharedGlobalContext,
pub(crate) async fn respond<AC: AuthContext>(
req_msg: RequestMessage<StartMirrorRequest>,
auth_ctx: &SpuAuthServiceContext<AC>,
sink: ExclusiveFlvSink,
stream: &mut FluvioStream,
stream: FluvioStream,
) {
// authorization check
if let Ok(authorized) = auth_ctx
.auth
.allow_instance_action(
ObjectType::Mirror,
InstanceAction::Update,
&req_msg.request.remote_cluster_id,
)
.await
{
if !authorized {
warn!(
"identity mismatch for remote_id: {}",
req_msg.request.remote_cluster_id
);
return;
}
}

// check if remote cluster exists
let mirrors = auth_ctx.global_ctx.mirrors_localstore().all_values();
let remote = mirrors
.iter()
.find(|mirror| match &mirror.spec.mirror_type {
MirrorType::Remote(r) => r.id == req_msg.request.remote_cluster_id,
_ => false,
});

if remote.is_none() {
warn!(
"remote cluster not found: {}",
req_msg.request.remote_cluster_id
);
return;
}

debug!("handling mirror request: {:#?}", req_msg);
let remote_replica = req_msg.request.remote_replica;
let remote_cluster_id = req_msg.request.remote_cluster_id;
let _access_key = req_msg.request.access_key;

if let Some(leader) = ctx
if let Some(leader) = auth_ctx
.global_ctx
.leaders_state()
.find_mirror_home_leader(&remote_cluster_id, &remote_replica)
.await
Expand All @@ -79,21 +120,19 @@ impl MirrorHomeHandler {
// map to actual home
let metrics = Arc::new(MirrorRequestMetrics::new());

// TODO: perform authorization
let handler: MirrorHomeHandler = Self {
metrics: metrics.clone(),
leader,
ctx,
ctx: auth_ctx.global_ctx.clone(),
};

if let Err(err) = handler.inner_respond(sink, stream).await {
error!("error handling mirror request: {:#?}", err);
}
} else {
// TODO: handle no home partition
warn!(
remote_replica,
remote_cluster_id, "no leader replica found for this"
remote_cluster_id, "no leader replica found for this mirror request"
);
}
}
Expand All @@ -102,7 +141,7 @@ impl MirrorHomeHandler {
async fn inner_respond(
self,
mut sink: ExclusiveFlvSink,
stream: &mut FluvioStream,
mut stream: FluvioStream,
) -> Result<()> {
// first send
let mut api_stream = stream.api_stream::<RemoteMirrorRequest, MirrorRemoteApiEnum>();
Expand Down
Loading

0 comments on commit e549af5

Please sign in to comment.