Skip to content

Commit

Permalink
Make version required in connector yaml (#2472)
Browse files Browse the repository at this point in the history
Closes #2456.
  • Loading branch information
simlay committed Jul 13, 2022
1 parent 670dc5a commit fd1fa81
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Add performance counters to producer ([#2424](https://github.com/infinyon/fluvio/issues/2424))
* Upgrade to fluvio-future 0.4.0 ([#2470](https://github.com/infinyon/fluvio/pull/2470))
* Add support to detecting smartmodule type from WASM payload on SPU ([#2457](https://github.com/infinyon/fluvio/issues/2457))
* Require `version` field in connector yaml. ([#2472](https://github.com/infinyon/fluvio/pull/2472))

## Platform Version 0.9.30 - 2022-06-29
* Improve CLI error output when log_dir isn't writable ([#2425](https://github.com/infinyon/fluvio/pull/2425))
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-cli/src/connector/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ mod output {
Row::from([
Cell::new(&r.name).set_alignment(CellAlignment::Left),
Cell::new(&spec.type_.to_string()).set_alignment(CellAlignment::Left),
Cell::new(&spec.version()).set_alignment(CellAlignment::Left),
Cell::new(&spec.version.to_string()).set_alignment(CellAlignment::Left),
Cell::new(&r.status.to_string()).set_alignment(CellAlignment::Right),
])
})
Expand Down
10 changes: 7 additions & 3 deletions crates/fluvio-cli/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub struct ConnectorConfig {
type_: String,

pub(crate) topic: String,
pub(crate) version: Option<String>,
pub(crate) version: String,

#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
parameters: BTreeMap<String, ManageConnectorParameterValue>,
Expand Down Expand Up @@ -206,7 +206,7 @@ impl From<ConnectorConfig> for ManagedConnectorSpec {
topic: config.topic,
parameters,
secrets: config.secrets,
version: config.version,
version: config.version.into(),
}
}
}
Expand Down Expand Up @@ -263,7 +263,7 @@ impl From<ManagedConnectorSpec> for ConnectorConfig {
name: spec.name,
type_: spec.type_,
topic: spec.topic,
version: spec.version,
version: spec.version.to_string(),
parameters,
secrets: spec.secrets,
producer,
Expand Down Expand Up @@ -346,4 +346,8 @@ fn error_yaml_tests() {
.expect_err("This yaml should error");
#[cfg(unix)]
assert_eq!("Other(\"couldn't parse \\\"aoeu\\\" into a known SI unit, couldn't parse unit of \\\"aoeu\\\"\")", format!("{:?}", connector_cfg));
let connector_cfg = ConnectorConfig::from_file("test-data/connectors/error-version.yaml")
.expect_err("This yaml should error");
#[cfg(unix)]
assert_eq!("ConnectorConfig(Message(\"missing field `version`\", Some(Pos { marker: Marker { index: 4, line: 1, col: 4 }, path: \".\" })))", format!("{:?}", connector_cfg));
}
5 changes: 5 additions & 0 deletions crates/fluvio-cli/test-data/connectors/error-version.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
name: my-test-mqtt
type: mqtt
topic: my-mqtt
create_topic: false
direction: source
6 changes: 3 additions & 3 deletions crates/fluvio-controlplane-metadata/src/connector/k8/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Spec for K8ManagedConnectorSpec {
#[serde(rename_all = "camelCase", default)]
pub struct K8ManagedConnectorSpec {
pub name: String,
pub version: Option<String>,
pub version: String,
#[cfg_attr(feature = "use_serde", serde(rename = "type"))]
pub type_: String, // syslog, github star, slack
pub topic: String,
Expand All @@ -60,7 +60,7 @@ mod convert {
topic: spec.topic,
parameters: spec.parameters,
secrets: spec.secrets,
version: spec.version,
version: spec.version.into(),
}
}
}
Expand All @@ -73,7 +73,7 @@ mod convert {
topic: spec.topic,
parameters: spec.parameters,
secrets: spec.secrets,
version: spec.version,
version: spec.version.to_string(),
}
}
}
Expand Down
79 changes: 72 additions & 7 deletions crates/fluvio-controlplane-metadata/src/connector/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bytes::BufMut;
pub struct ManagedConnectorSpec {
pub name: String,

pub version: Option<String>,
pub version: ConnectorVersionInner,

#[cfg_attr(feature = "use_serde", serde(rename = "type"))]
pub type_: String, // syslog, github star, slack
Expand All @@ -30,6 +30,76 @@ pub struct ManagedConnectorSpec {

pub secrets: BTreeMap<String, SecretString>,
}
#[derive(Debug, PartialEq, Clone)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "camelCase", untagged)
)]
pub enum ConnectorVersionInner {
String(String),
Option(Option<String>),
}

impl Default for ConnectorVersionInner {
fn default() -> Self {
Self::String(String::new())
}
}
impl From<String> for ConnectorVersionInner {
fn from(inner: String) -> Self {
Self::String(inner)
}
}
impl ToString for ConnectorVersionInner {
fn to_string(&self) -> String {
match self {
ConnectorVersionInner::String(inner) => inner.to_string(),
ConnectorVersionInner::Option(inner) => {
inner.clone().unwrap_or_else(|| "latest".to_string())
}
}
}
}

impl Decoder for ConnectorVersionInner {
fn decode<T: Buf>(&mut self, src: &mut T, version: Version) -> Result<(), std::io::Error> {
if version >= 9 {
let mut new_string = String::new();
new_string.decode(src, version)?;
*self = ConnectorVersionInner::String(new_string);

Ok(())
} else {
let mut inner: Option<String> = None;
inner.decode(src, version)?;
*self = ConnectorVersionInner::Option(inner);
Ok(())
}
}
}
impl Encoder for ConnectorVersionInner {
fn write_size(&self, version: Version) -> usize {
if version >= 9 {
let inner: String = self.clone().to_string();
inner.write_size(version)
} else {
let inner: Option<String> = Some(self.clone().to_string());
inner.write_size(version)
}
}

/// encoding contents for buffer
fn encode<T: BufMut>(&self, dest: &mut T, version: Version) -> Result<(), std::io::Error> {
if version >= 9 {
let inner: String = self.clone().to_string();
inner.encode(dest, version)
} else {
let inner: Option<String> = Some(self.clone().to_string());
inner.encode(dest, version)
}
}
}

#[derive(Encoder, Decoder, Debug, PartialEq, Clone)]
#[cfg_attr(
Expand Down Expand Up @@ -182,6 +252,7 @@ parameters:
secrets: {}
topic: poc1
type: kafka-sink
version: latest
"#;
use super::ManagedConnectorSpec;
let connector_spec: ManagedConnectorSpec =
Expand Down Expand Up @@ -253,12 +324,6 @@ impl Encoder for ManageConnectorParameterValue {
}
}

impl ManagedConnectorSpec {
pub fn version(&self) -> String {
self.version.clone().unwrap_or_else(|| "latest".to_string())
}
}

#[derive(Encoder, Decoder, Default, PartialEq, Clone)]
#[cfg_attr(
feature = "use_serde",
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc-schema/src/objects/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct CommonCreateRequest {

impl Request for ObjectApiCreateRequest {
const API_KEY: u16 = AdminPublicApiKey::Create as u16;
const DEFAULT_API_VERSION: i16 = 8;
const DEFAULT_API_VERSION: i16 = 9;
type Response = Status;
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc-schema/src/objects/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ where

impl Request for ObjectApiListRequest {
const API_KEY: u16 = AdminPublicApiKey::List as u16;
const DEFAULT_API_VERSION: i16 = 8;
const DEFAULT_API_VERSION: i16 = 9;
type Response = ObjectApiListResponse;
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc-schema/src/objects/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct WatchRequest<S: AdminSpec> {

impl Request for ObjectApiWatchRequest {
const API_KEY: u16 = AdminPublicApiKey::Watch as u16;
const DEFAULT_API_VERSION: i16 = 8;
const DEFAULT_API_VERSION: i16 = 9;
type Response = ObjectApiWatchResponse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,10 @@ impl ManagedConnectorDeploymentController {

args.extend(parameters);

let (image, image_pull_policy) = match mc_spec.version.as_deref() {
Some("dev") => (image, ImagePullPolicy::Never),
Some("latest") | None => (format!("{}:latest", image), ImagePullPolicy::Always),
Some(version) => (
let (image, image_pull_policy) = match mc_spec.version.to_string().as_str() {
"dev" => (image, ImagePullPolicy::Never),
"latest" => (format!("{}:latest", image), ImagePullPolicy::Always),
version => (
format!("{}:{}", image, version),
ImagePullPolicy::IfNotPresent,
),
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-test/src/tests/smoke/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ pub struct ConnectorConfig {
#[serde(rename = "type")]
type_: String,
pub(crate) topic: String,
pub(crate) version: Option<String>,
pub(crate) version: String,
#[serde(default)]
pub(crate) create_topic: bool,
#[serde(default)]
Expand Down Expand Up @@ -349,7 +349,7 @@ impl From<ConnectorConfig> for ManagedConnectorSpec {
topic: config.topic,
parameters: config.parameters,
secrets: config.secrets,
version: config.version,
version: config.version.into(),
}
}
}

0 comments on commit fd1fa81

Please sign in to comment.