Skip to content

Commit

Permalink
complete session implementation; C++ related changes (#1822)
Browse files Browse the repository at this point in the history
* complete session implemetnation; C++ related changes

* Removed lots of .unwraps

* auth->credentials

* Removed most instances of .unwrap from eventhubs and amqp
  • Loading branch information
LarryOsterman authored Sep 27, 2024
1 parent f1dedb4 commit 7d1b96c
Show file tree
Hide file tree
Showing 22 changed files with 580 additions and 237 deletions.
8 changes: 4 additions & 4 deletions sdk/core/azure_core_amqp/src/cbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ impl AmqpClaimsBasedSecurityApis for AmqpClaimsBasedSecurity {
}

impl AmqpClaimsBasedSecurity {
pub fn new(session: AmqpSession) -> Self {
Self {
implementation: CbsImplementation::new(session),
}
pub fn new(session: AmqpSession) -> Result<Self> {
Ok(Self {
implementation: CbsImplementation::new(session)?,
})
}
}
29 changes: 22 additions & 7 deletions sdk/core/azure_core_amqp/src/fe2o3/cbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ pub(crate) struct Fe2o3ClaimsBasedSecurity {
}

impl Fe2o3ClaimsBasedSecurity {
pub fn new(session: AmqpSession) -> Self {
Self {
pub fn new(session: AmqpSession) -> Result<Self> {
Ok(Self {
cbs: OnceLock::new(),
session: session.implementation.get(),
}
session: session.implementation.get()?,
})
}
}

Expand All @@ -46,7 +46,12 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity {
.attach(session.borrow_mut())
.await
.map_err(AmqpManagementAttach::from)?;
self.cbs.set(Mutex::new(cbs_client)).unwrap();
self.cbs.set(Mutex::new(cbs_client)).map_err(|_| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Claims Based Security is already set.",
)
})?;
Ok(())
}

Expand All @@ -69,12 +74,22 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity {
.to_offset(time::UtcOffset::UTC)
.unix_timestamp()
.checked_mul(1_000)
.unwrap(),
.ok_or_else(|| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Unable to convert time to unix timestamp.",
)
})?,
)),
);
self.cbs
.get()
.unwrap()
.ok_or_else(|| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Claims Based Security was not set.",
)
})?
.lock()
.await
.borrow_mut()
Expand Down
121 changes: 75 additions & 46 deletions sdk/core/azure_core_amqp/src/fe2o3/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,66 +51,85 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection {
.container_id(id)
.max_frame_size(65536);

if options.is_some() {
let options = options.unwrap();
if options.max_frame_size.is_some() {
builder = builder.max_frame_size(options.max_frame_size.unwrap());
}
if options.channel_max.is_some() {
builder = builder.channel_max(options.channel_max.unwrap());
}
if options.idle_timeout.is_some() {
builder = builder
.idle_time_out(options.idle_timeout.unwrap().whole_milliseconds() as u32);
}
if options.outgoing_locales.is_some() {
for locale in options.outgoing_locales.as_ref().unwrap() {
builder = builder.add_outgoing_locales(locale.as_str());
}
let options = options.ok_or_else(|| {
azure_core::Error::new(
azure_core::error::ErrorKind::Other,
"Connection options are not set.",
)
})?;

if let Some(frame_size) = options.max_frame_size {
builder = builder.max_frame_size(frame_size);
}

if let Some(channel_max) = options.channel_max {
builder = builder.channel_max(channel_max);
}
if let Some(idle_timeout) = options.idle_timeout {
builder = builder.idle_time_out(idle_timeout.whole_milliseconds() as u32);
}
if let Some(outgoing_locales) = options.outgoing_locales.as_ref() {
for locale in outgoing_locales {
builder = builder.add_outgoing_locales(locale.as_str());
}
if options.incoming_locales.is_some() {
for locale in options.incoming_locales.as_ref().unwrap() {
builder = builder.add_incoming_locales(locale.as_str());
}
}
if let Some(incoming_locales) = options.incoming_locales {
for locale in incoming_locales {
builder = builder.add_incoming_locales(locale.as_str());
}
if options.offered_capabilities.is_some() {
for capability in options.offered_capabilities.unwrap() {
let capability: fe2o3_amqp_types::primitives::Symbol = capability.into();
builder = builder.add_offered_capabilities(capability);
}
}
if let Some(offered_capabilities) = options.offered_capabilities.as_ref() {
for capability in offered_capabilities {
let capability: fe2o3_amqp_types::primitives::Symbol =
capability.clone().into();
builder = builder.add_offered_capabilities(capability);
}
if options.desired_capabilities.is_some() {
for capability in options.desired_capabilities.unwrap() {
let capability: fe2o3_amqp_types::primitives::Symbol = capability.into();
builder = builder.add_desired_capabilities(capability);
}
}
if let Some(desired_capabilities) = options.desired_capabilities.as_ref() {
for capability in desired_capabilities {
let capability: fe2o3_amqp_types::primitives::Symbol =
capability.clone().into();
builder = builder.add_desired_capabilities(capability);
}
if options.properties.is_some() {
let mut fields = fe2o3_amqp::types::definitions::Fields::new();
for property in options.properties.unwrap().iter() {
debug!("Property: {:?}, Value: {:?}", property.0, property.1);
let k: fe2o3_amqp_types::primitives::Symbol = property.0.into();
let v: fe2o3_amqp_types::primitives::Value = property.1.into();
debug!("Property2: {:?}, Value: {:?}", k, v);
}
if let Some(properties) = options.properties.as_ref() {
let mut fields = fe2o3_amqp::types::definitions::Fields::new();
for property in properties.iter() {
let k = fe2o3_amqp_types::primitives::Symbol::from(property.0);
let v = fe2o3_amqp_types::primitives::Value::from(property.1);

fields.insert(k, v);
}
builder = builder.properties(fields);
}
if options.buffer_size.is_some() {
builder = builder.buffer_size(options.buffer_size.unwrap());
fields.insert(k, v);
}
builder = builder.properties(fields);
}
if let Some(buffer_size) = options.buffer_size {
builder = builder.buffer_size(buffer_size);
}

self.connection
.set(Mutex::new(builder.open(url).await.map_err(AmqpOpen::from)?))
.unwrap();
.map_err(|_| {
azure_core::Error::new(
azure_core::error::ErrorKind::Other,
"Connection already set.",
)
})?;
Ok(())
}
}

async fn close(&self) -> Result<()> {
let mut connection = self.connection.get().unwrap().lock().await;
let mut connection = self
.connection
.get()
.ok_or_else(|| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Connection is not set",
)
})?
.lock()
.await;
connection
.borrow_mut()
.close()
Expand All @@ -124,7 +143,17 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection {
description: Option<String>,
info: Option<AmqpOrderedMap<AmqpSymbol, AmqpValue>>,
) -> Result<()> {
let mut connection = self.connection.get().unwrap().lock().await;
let mut connection = self
.connection
.get()
.ok_or_else(|| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Connection is not set",
)
})?
.lock()
.await;
connection
.borrow_mut()
.close_with_error(fe2o3_amqp::types::definitions::Error::new(
Expand Down
27 changes: 21 additions & 6 deletions sdk/core/azure_core_amqp/src/fe2o3/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ impl Fe2o3AmqpManagement {
session: AmqpSession,
client_node_name: impl Into<String>,
access_token: AccessToken,
) -> Self {
) -> Result<Self> {
// Session::get() returns a clone of the underlying session handle.
let session = session.implementation.get();
let session = session.implementation.get()?;

Self {
Ok(Self {
access_token,
client_node_name: client_node_name.into(),
session,
management: OnceLock::new(),
}
})
}
}

Expand All @@ -59,15 +59,30 @@ impl AmqpManagementApis for Fe2o3AmqpManagement {
.await
.map_err(AmqpManagementAttach::from)?;

self.management.set(Mutex::new(management)).unwrap();
self.management.set(Mutex::new(management)).map_err(|_| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Management is already set.",
)
})?;
Ok(())
}
async fn call(
&self,
operation_type: impl Into<String>,
application_properties: AmqpOrderedMap<String, AmqpValue>,
) -> Result<AmqpOrderedMap<String, AmqpValue>> {
let mut management = self.management.get().unwrap().lock().await;
let mut management = self
.management
.get()
.ok_or_else(|| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"management is not set.",
)
})?
.lock()
.await;

let request = WithApplicationPropertiesRequest::new(
operation_type,
Expand Down
63 changes: 51 additions & 12 deletions sdk/core/azure_core_amqp/src/fe2o3/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
messaging::{AmqpMessage, AmqpMessageBody},
value::AmqpValue,
};
use azure_core::{error::ErrorKind, Error};
use fe2o3_amqp_types::messaging::{message::EmptyBody, IntoBody};
use serde_amqp::{extensions::TransparentVec, Value};
use tracing::info;
Expand Down Expand Up @@ -47,12 +48,31 @@ impl TryInto<AmqpValue> for Vec<Vec<serde_amqp::Value>> {
}
}

/*
* Convert a fe2o3 message into an AMQP message.
*
* Note that this API can be used for four different scenarios:
* 1) Body is empty
* 2) Body is an array of binary blobs
* 3) Body is an AMQP value
* 4) Body is a sequence of AMQP values.
*
* In order to satisfy all four of these, the method is specialized on the type of body element.
* Since the actual body is either <nothing>, a Vec<Vec<u8>> or AmqpValue or Vec<AmqpValue>
* the T specialization is declared as being TryInto<AmqpValue>. That way, when processing the
* empty body or the binary body, we won't call Into<AmqpValue> on the body, and when it is
* a Vec<AmqpValue> or an AmqpValue, we will.
*
* TryInto<T> has a specialization where Into<T> exists, which returns an immutable error
* and in this case there is an fe2o3 Value Into AmqpValue specialization, which means that the call to convert
* the T value into an AmqpValue will always succeed.
*/
fn amqp_message_from_fe2o3_message<T>(
message: fe2o3_amqp_types::messaging::Message<fe2o3_amqp_types::messaging::Body<T>>,
) -> AmqpMessage
) -> azure_core::Result<AmqpMessage>
where
T: std::fmt::Debug + Clone + TryInto<AmqpValue>,
<T as TryInto<AmqpValue>>::Error: std::fmt::Debug,
<T as TryInto<AmqpValue>>::Error: std::error::Error,
{
let mut amqp_message_builder = AmqpMessage::builder();

Expand All @@ -65,22 +85,41 @@ where
let body = AmqpMessageBody::Empty;
amqp_message_builder.with_body(body);
} else if body.is_data() {
let data = body.try_into_data().unwrap();
let data = body.try_into_data().map_err(|_| {
Error::message(
ErrorKind::DataConversion,
"Could not convert AMQP Message Body to data.",
)
})?;
let body = AmqpMessageBody::Binary(data.map(|x| x.to_vec()).collect());
amqp_message_builder.with_body(body);
} else if body.is_value() {
let value = body.try_into_value().unwrap();
let value = body.try_into_value().map_err(|_| {
Error::message(
ErrorKind::DataConversion,
"Could not convert AMQP Message Body to value.",
)
})?;
// Because a conversion exists between fe2o3 values and AmqpValue types,
// this try_into will always succeed.
let value = value.try_into().unwrap();
amqp_message_builder.with_body(AmqpMessageBody::Value(value));
} else if body.is_sequence() {
let sequence = body.try_into_sequence().unwrap();
let sequence = body.try_into_sequence().map_err(|_| {
Error::message(
ErrorKind::DataConversion,
"Could not convert AMQP Message Body to sequence.",
)
})?;

let body = AmqpMessageBody::Sequence(
sequence
.map(|x| {
x.iter()
x.into_iter()
.map(|v| {
let v: AmqpValue = v.clone().try_into().unwrap();
Into::<AmqpValue>::into(v)
// Because a conversion exists between fe2o3 values and AmqpValue types,
// this try_into will always succeed.
TryInto::<AmqpValue>::try_into(v).unwrap()
})
.collect()
})
Expand Down Expand Up @@ -109,7 +148,7 @@ where
amqp_message_builder.with_footer(footer.0.into());
}

amqp_message_builder.build()
Ok(amqp_message_builder.build())
}

impl From<fe2o3_amqp_types::messaging::Message<fe2o3_amqp_types::messaging::Body<Value>>>
Expand All @@ -118,7 +157,7 @@ impl From<fe2o3_amqp_types::messaging::Message<fe2o3_amqp_types::messaging::Body
fn from(
message: fe2o3_amqp_types::messaging::Message<fe2o3_amqp_types::messaging::Body<Value>>,
) -> Self {
amqp_message_from_fe2o3_message(message)
amqp_message_from_fe2o3_message(message).unwrap()
}
}

Expand All @@ -134,7 +173,7 @@ impl
fe2o3_amqp_types::messaging::Body<TransparentVec<fe2o3_amqp_types::messaging::Data>>,
>,
) -> Self {
amqp_message_from_fe2o3_message(message)
amqp_message_from_fe2o3_message(message).unwrap()
}
}

Expand All @@ -154,7 +193,7 @@ impl
>,
>,
) -> Self {
amqp_message_from_fe2o3_message(message)
amqp_message_from_fe2o3_message(message).unwrap()
}
}

Expand Down
Loading

0 comments on commit 7d1b96c

Please sign in to comment.