Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tls and authorization on SC mirroring #4017

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/fluvio-auth/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ pub enum TypeAction {

pub enum InstanceAction {
Delete,
Update,
}

#[async_trait]
pub trait AuthContext: Debug {
pub trait AuthContext: Debug + Send + Sync + 'static {
/// check if any allow type specific action can be allowed
async fn allow_type_action(
&self,
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-sc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tracing = { workspace = true }


# Fluvio dependencies
fluvio = { workspace = true }
fluvio-auth = { workspace = true }
fluvio-future = { workspace = true, features = [
"subscriber",
Expand Down
56 changes: 52 additions & 4 deletions crates/fluvio-sc/src/controllers/mirroring/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::time::{Duration, SystemTime};
use tracing::{debug, error, info, instrument};
use anyhow::{anyhow, Result};

use fluvio::config::TlsPolicy;
use fluvio_socket::{ClientConfig, MultiplexerSocket, StreamSocket};
use futures_util::StreamExt;
use fluvio_future::{task::spawn, timer::sleep};
use fluvio_future::{net::DomainConnector, task::spawn, timer::sleep};
use fluvio_sc_schema::{
core::MetadataItem,
mirror::{ConnectionStatus, MirrorPairStatus, MirrorSpec, MirrorStatus, MirrorType},
mirror::{ConnectionStatus, Home, MirrorPairStatus, MirrorSpec, MirrorStatus, MirrorType},
mirroring::ObjectMirroringRequest,
topic::{MirrorConfig, RemoteMirrorConfig, ReplicaSpec, SpuMirrorConfig, TopicSpec},
TryEncodableFrom,
Expand Down Expand Up @@ -72,9 +73,36 @@ impl<C: MetadataItem> RemoteMirrorController<C> {

if let Some(home) = home_mirrors.first() {
//send to home cluster the connect request
//TODO: handle TLS
let tlspolicy = option_tlspolicy(home);

// handling tls
let home_config = if let Some(tlspolicy) = &tlspolicy {
match DomainConnector::try_from(tlspolicy.clone()) {
Ok(connector) => {
ClientConfig::new(home.public_endpoint.clone(), connector, false)
}
Err(err) => {
error!(
"error establishing tls with leader at: <{}> err: {}",
home.public_endpoint.clone(),
err
);
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_millis();
let status = MirrorStatus::new(
MirrorPairStatus::Failed,
ConnectionStatus::Online,
now as u64,
);
self.mirrors.update_status(home.id.clone(), status).await?;
return Err(err.into());
}
}
} else {
ClientConfig::with_addr(home.public_endpoint.clone())
};

let home_config = ClientConfig::with_addr(home.public_endpoint.clone());
let versioned_socket = home_config.connect().await?;
let (socket, config, versions) = versioned_socket.split();
info!("connecting to home: {}", home.public_endpoint);
Expand Down Expand Up @@ -195,3 +223,23 @@ impl<C: MetadataItem> RemoteMirrorController<C> {
}
}
}

fn option_tlspolicy(home: &Home) -> Option<TlsPolicy> {
use fluvio::config::{TlsCerts, TlsConfig};

let ct = match &home.client_tls {
Some(ct) => ct,
_ => {
return None;
}
};

let certs = TlsCerts {
domain: ct.domain.clone(),
key: ct.client_key.clone(),
cert: ct.client_cert.clone(),
ca_cert: ct.ca_cert.clone(),
};
let tlscfg = TlsConfig::Inline(certs);
Some(TlsPolicy::from(tlscfg))
}
1 change: 1 addition & 0 deletions crates/fluvio-sc/src/services/auth/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ mod policy {
fn from(action: InstanceAction) -> Self {
match action {
InstanceAction::Delete => Action::Delete,
InstanceAction::Update => Action::Update,
}
}
}
Expand Down
75 changes: 55 additions & 20 deletions crates/fluvio-sc/src/services/public_api/mirroring/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,54 @@ use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use fluvio_controlplane_metadata::mirroring::{
MirrorConnect, MirroringSpecWrapper, MirroringStatusResponse,
use tracing::{debug, error, info, instrument, trace, warn};
use anyhow::{Result, anyhow};

use fluvio_auth::{AuthContext, InstanceAction};
use fluvio_controlplane_metadata::{
extended::ObjectType,
mirroring::{MirrorConnect, MirroringSpecWrapper, MirroringStatusResponse},
};
use fluvio_future::{task::spawn, timer::sleep};
use fluvio_protocol::api::{RequestHeader, ResponseMessage};
use fluvio_sc_schema::{
core::MetadataItem,
mirror::{ConnectionStatus, MirrorPairStatus, MirrorStatus},
mirror::{ConnectionStatus, MirrorPairStatus, MirrorStatus, MirrorType},
spu::SpuSpec,
store::ChangeListener,
topic::{MirrorConfig, ReplicaSpec, TopicSpec},
};
use fluvio_socket::ExclusiveFlvSink;
use fluvio_types::event::StickyEvent;
use tracing::{debug, error, info, instrument, trace};
use anyhow::{Result, anyhow};

use crate::core::Context;
use crate::services::auth::AuthServiceContext;

// This is the entry point for handling mirroring requests
// Home clusters will receive requests from remote clusters
pub struct RemoteFetchingFromHomeController<C: MetadataItem> {
pub struct RemoteFetchingFromHomeController<AC: AuthContext, C: MetadataItem> {
req: MirrorConnect,
response_sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
ctx: Arc<Context<C>>,
header: RequestHeader,
auth_ctx: Arc<AuthServiceContext<AC, C>>,
}

const MIRRORING_CONTROLLER_INTERVAL: u64 = 5;

impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
impl<AC: AuthContext, C: MetadataItem> RemoteFetchingFromHomeController<AC, C> {
pub fn start(
req: MirrorConnect,
response_sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
ctx: Arc<Context<C>>,
header: RequestHeader,
auth_ctx: Arc<AuthServiceContext<AC, C>>,
) {
let controller = Self {
req: req.clone(),
response_sink,
end_event,
ctx,
header,
auth_ctx,
};

spawn(controller.dispatch_loop());
Expand All @@ -55,13 +58,46 @@ impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
#[instrument(skip(self), name = "RemoteFetchingFromHomeControllerLoop")]
async fn dispatch_loop(mut self) {
use tokio::select;

// authorization check
if let Ok(authorized) = self
.auth_ctx
.auth
.allow_instance_action(
ObjectType::Mirror,
InstanceAction::Update,
&self.req.remote_id,
)
.await
{
if !authorized {
warn!("identity mismatch for remote_id: {}", self.req.remote_id);
return;
}
}

let ctx = self.auth_ctx.global_ctx.clone();

// check if remote cluster exists
let mirrors = ctx.mirrors().store().value(&self.req.remote_id).await;
let remote = mirrors
.iter()
.find(|mirror| match &mirror.spec.mirror_type {
MirrorType::Remote(r) => r.id == self.req.remote_id,
_ => false,
});

if remote.is_none() {
warn!("remote cluster not found: {}", self.req.remote_id);
return;
}
info!(
name = self.req.remote_id,
"received mirroring connect request"
);

let mut topics_listener = self.ctx.topics().change_listener();
let mut spus_listerner = self.ctx.spus().change_listener();
let mut topics_listener = ctx.topics().change_listener();
let mut spus_listerner = ctx.spus().change_listener();

loop {
if self
Expand Down Expand Up @@ -105,10 +141,11 @@ impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
debug!("no changes, skipping");
return Ok(());
}
let ctx = self.auth_ctx.global_ctx.clone();

let spus = self.ctx.spus().store().clone_values().await;
let spus = ctx.spus().store().clone_values().await;
let topics = ctx.topics().store().clone_values().await;

let topics = self.ctx.topics().store().clone_values().await;
let mirror_topics = topics
.into_iter()
.filter_map(|topic| match topic.spec.replicas() {
Expand Down Expand Up @@ -150,7 +187,7 @@ impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
})
.collect::<Vec<_>>();

match self.ctx.mirrors().store().value(&self.req.remote_id).await {
match ctx.mirrors().store().value(&self.req.remote_id).await {
Some(remote) => {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
Expand All @@ -173,8 +210,7 @@ impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
now as u64,
);

self.ctx
.mirrors()
ctx.mirrors()
.update_status(remote.key.clone(), status)
.await?;
error!(
Expand All @@ -192,8 +228,7 @@ impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
ConnectionStatus::Online,
now as u64,
);
self.ctx
.mirrors()
ctx.mirrors()
.update_status(remote.key.clone(), status)
.await?;

Expand Down
5 changes: 2 additions & 3 deletions crates/fluvio-sc/src/services/public_api/mirroring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@ pub enum MirrorRequests {
#[instrument(skip(request, auth_ctx, sink, end_event))]
pub fn handle_mirroring_request<AC: AuthContext, C: MetadataItem>(
request: RequestMessage<ObjectMirroringRequest>,
auth_ctx: &AuthServiceContext<AC, C>,
auth_ctx: Arc<AuthServiceContext<AC, C>>,
sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
) -> Result<()> {
info!("remote cluster register request {:?}", request);

let (header, req) = request.get_header_request();
let ctx = auth_ctx.global_ctx.clone();

let Ok(req) = try_convert_to_reqs(req) else {
return Err(anyhow!("unable to decode request"));
};

match req {
MirrorRequests::Connect(req) => {
RemoteFetchingFromHomeController::start(req, sink, end_event, ctx, header);
RemoteFetchingFromHomeController::start(req, sink, end_event, header, auth_ctx);
}
};

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc/src/services/public_api/public_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ where
"list handler"
),
AdminPublicDecodedRequest::MirroringRequest(request) =>
super::mirroring::handle_mirroring_request(request, &service_context, shared_sink.clone(), end_event.clone())?,
super::mirroring::handle_mirroring_request(request, service_context.clone(), shared_sink.clone(), end_event.clone())?,
AdminPublicDecodedRequest::WatchRequest(request) =>
super::watch::handle_watch_request(
request,
Expand Down
Loading