Skip to content

Commit

Permalink
feat: tls and authorization on SC mirroring (#4017)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed May 22, 2024
1 parent 9bf976d commit 4ba088a
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/fluvio-auth/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ pub enum TypeAction {

pub enum InstanceAction {
Delete,
Update,
}

#[async_trait]
pub trait AuthContext: Debug {
pub trait AuthContext: Debug + Send + Sync + 'static {
/// check if any allow type specific action can be allowed
async fn allow_type_action(
&self,
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-sc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tracing = { workspace = true }


# Fluvio dependencies
fluvio = { workspace = true }
fluvio-auth = { workspace = true }
fluvio-future = { workspace = true, features = [
"subscriber",
Expand Down
56 changes: 52 additions & 4 deletions crates/fluvio-sc/src/controllers/mirroring/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::time::{Duration, SystemTime};
use tracing::{debug, error, info, instrument};
use anyhow::{anyhow, Result};

use fluvio::config::TlsPolicy;
use fluvio_socket::{ClientConfig, MultiplexerSocket, StreamSocket};
use futures_util::StreamExt;
use fluvio_future::{task::spawn, timer::sleep};
use fluvio_future::{net::DomainConnector, task::spawn, timer::sleep};
use fluvio_sc_schema::{
core::MetadataItem,
mirror::{ConnectionStatus, MirrorPairStatus, MirrorSpec, MirrorStatus, MirrorType},
mirror::{ConnectionStatus, Home, MirrorPairStatus, MirrorSpec, MirrorStatus, MirrorType},
mirroring::ObjectMirroringRequest,
topic::{MirrorConfig, RemoteMirrorConfig, ReplicaSpec, SpuMirrorConfig, TopicSpec},
TryEncodableFrom,
Expand Down Expand Up @@ -72,9 +73,36 @@ impl<C: MetadataItem> RemoteMirrorController<C> {

if let Some(home) = home_mirrors.first() {
//send to home cluster the connect request
//TODO: handle TLS
let tlspolicy = option_tlspolicy(home);

// handling tls
let home_config = if let Some(tlspolicy) = &tlspolicy {
match DomainConnector::try_from(tlspolicy.clone()) {
Ok(connector) => {
ClientConfig::new(home.public_endpoint.clone(), connector, false)
}
Err(err) => {
error!(
"error establishing tls with leader at: <{}> err: {}",
home.public_endpoint.clone(),
err
);
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_millis();
let status = MirrorStatus::new(
MirrorPairStatus::Failed,
ConnectionStatus::Online,
now as u64,
);
self.mirrors.update_status(home.id.clone(), status).await?;
return Err(err.into());
}
}
} else {
ClientConfig::with_addr(home.public_endpoint.clone())
};

let home_config = ClientConfig::with_addr(home.public_endpoint.clone());
let versioned_socket = home_config.connect().await?;
let (socket, config, versions) = versioned_socket.split();
info!("connecting to home: {}", home.public_endpoint);
Expand Down Expand Up @@ -195,3 +223,23 @@ impl<C: MetadataItem> RemoteMirrorController<C> {
}
}
}

fn option_tlspolicy(home: &Home) -> Option<TlsPolicy> {
use fluvio::config::{TlsCerts, TlsConfig};

let ct = match &home.client_tls {
Some(ct) => ct,
_ => {
return None;
}
};

let certs = TlsCerts {
domain: ct.domain.clone(),
key: ct.client_key.clone(),
cert: ct.client_cert.clone(),
ca_cert: ct.ca_cert.clone(),
};
let tlscfg = TlsConfig::Inline(certs);
Some(TlsPolicy::from(tlscfg))
}
1 change: 1 addition & 0 deletions crates/fluvio-sc/src/services/auth/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ mod policy {
fn from(action: InstanceAction) -> Self {
match action {
InstanceAction::Delete => Action::Delete,
InstanceAction::Update => Action::Update,
}
}
}
Expand Down
75 changes: 55 additions & 20 deletions crates/fluvio-sc/src/services/public_api/mirroring/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,54 @@ use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use fluvio_controlplane_metadata::mirroring::{
MirrorConnect, MirroringSpecWrapper, MirroringStatusResponse,
use tracing::{debug, error, info, instrument, trace, warn};
use anyhow::{Result, anyhow};

use fluvio_auth::{AuthContext, InstanceAction};
use fluvio_controlplane_metadata::{
extended::ObjectType,
mirroring::{MirrorConnect, MirroringSpecWrapper, MirroringStatusResponse},
};
use fluvio_future::{task::spawn, timer::sleep};
use fluvio_protocol::api::{RequestHeader, ResponseMessage};
use fluvio_sc_schema::{
core::MetadataItem,
mirror::{ConnectionStatus, MirrorPairStatus, MirrorStatus},
mirror::{ConnectionStatus, MirrorPairStatus, MirrorStatus, MirrorType},
spu::SpuSpec,
store::ChangeListener,
topic::{MirrorConfig, ReplicaSpec, TopicSpec},
};
use fluvio_socket::ExclusiveFlvSink;
use fluvio_types::event::StickyEvent;
use tracing::{debug, error, info, instrument, trace};
use anyhow::{Result, anyhow};

use crate::core::Context;
use crate::services::auth::AuthServiceContext;

// This is the entry point for handling mirroring requests
// Home clusters will receive requests from remote clusters
pub struct RemoteFetchingFromHomeController<C: MetadataItem> {
pub struct RemoteFetchingFromHomeController<AC: AuthContext, C: MetadataItem> {
req: MirrorConnect,
response_sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
ctx: Arc<Context<C>>,
header: RequestHeader,
auth_ctx: Arc<AuthServiceContext<AC, C>>,
}

const MIRRORING_CONTROLLER_INTERVAL: u64 = 5;

impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
impl<AC: AuthContext, C: MetadataItem> RemoteFetchingFromHomeController<AC, C> {
pub fn start(
req: MirrorConnect,
response_sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
ctx: Arc<Context<C>>,
header: RequestHeader,
auth_ctx: Arc<AuthServiceContext<AC, C>>,
) {
let controller = Self {
req: req.clone(),
response_sink,
end_event,
ctx,
header,
auth_ctx,
};

spawn(controller.dispatch_loop());
Expand All @@ -55,13 +58,46 @@ impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
#[instrument(skip(self), name = "RemoteFetchingFromHomeControllerLoop")]
async fn dispatch_loop(mut self) {
use tokio::select;

// authorization check
if let Ok(authorized) = self
.auth_ctx
.auth
.allow_instance_action(
ObjectType::Mirror,
InstanceAction::Update,
&self.req.remote_id,
)
.await
{
if !authorized {
warn!("identity mismatch for remote_id: {}", self.req.remote_id);
return;
}
}

let ctx = self.auth_ctx.global_ctx.clone();

// check if remote cluster exists
let mirrors = ctx.mirrors().store().value(&self.req.remote_id).await;
let remote = mirrors
.iter()
.find(|mirror| match &mirror.spec.mirror_type {
MirrorType::Remote(r) => r.id == self.req.remote_id,
_ => false,
});

if remote.is_none() {
warn!("remote cluster not found: {}", self.req.remote_id);
return;
}
info!(
name = self.req.remote_id,
"received mirroring connect request"
);

let mut topics_listener = self.ctx.topics().change_listener();
let mut spus_listerner = self.ctx.spus().change_listener();
let mut topics_listener = ctx.topics().change_listener();
let mut spus_listerner = ctx.spus().change_listener();

loop {
if self
Expand Down Expand Up @@ -105,10 +141,11 @@ impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
debug!("no changes, skipping");
return Ok(());
}
let ctx = self.auth_ctx.global_ctx.clone();

let spus = self.ctx.spus().store().clone_values().await;
let spus = ctx.spus().store().clone_values().await;
let topics = ctx.topics().store().clone_values().await;

let topics = self.ctx.topics().store().clone_values().await;
let mirror_topics = topics
.into_iter()
.filter_map(|topic| match topic.spec.replicas() {
Expand Down Expand Up @@ -150,7 +187,7 @@ impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
})
.collect::<Vec<_>>();

match self.ctx.mirrors().store().value(&self.req.remote_id).await {
match ctx.mirrors().store().value(&self.req.remote_id).await {
Some(remote) => {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
Expand All @@ -173,8 +210,7 @@ impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
now as u64,
);

self.ctx
.mirrors()
ctx.mirrors()
.update_status(remote.key.clone(), status)
.await?;
error!(
Expand All @@ -192,8 +228,7 @@ impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
ConnectionStatus::Online,
now as u64,
);
self.ctx
.mirrors()
ctx.mirrors()
.update_status(remote.key.clone(), status)
.await?;

Expand Down
5 changes: 2 additions & 3 deletions crates/fluvio-sc/src/services/public_api/mirroring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@ pub enum MirrorRequests {
#[instrument(skip(request, auth_ctx, sink, end_event))]
pub fn handle_mirroring_request<AC: AuthContext, C: MetadataItem>(
request: RequestMessage<ObjectMirroringRequest>,
auth_ctx: &AuthServiceContext<AC, C>,
auth_ctx: Arc<AuthServiceContext<AC, C>>,
sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
) -> Result<()> {
info!("remote cluster register request {:?}", request);

let (header, req) = request.get_header_request();
let ctx = auth_ctx.global_ctx.clone();

let Ok(req) = try_convert_to_reqs(req) else {
return Err(anyhow!("unable to decode request"));
};

match req {
MirrorRequests::Connect(req) => {
RemoteFetchingFromHomeController::start(req, sink, end_event, ctx, header);
RemoteFetchingFromHomeController::start(req, sink, end_event, header, auth_ctx);
}
};

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc/src/services/public_api/public_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ where
"list handler"
),
AdminPublicDecodedRequest::MirroringRequest(request) =>
super::mirroring::handle_mirroring_request(request, &service_context, shared_sink.clone(), end_event.clone())?,
super::mirroring::handle_mirroring_request(request, service_context.clone(), shared_sink.clone(), end_event.clone())?,
AdminPublicDecodedRequest::WatchRequest(request) =>
super::watch::handle_watch_request(
request,
Expand Down

0 comments on commit 4ba088a

Please sign in to comment.