Skip to content

Commit

Permalink
Add mixnodes to self describing api cache (#4684)
Browse files Browse the repository at this point in the history
* Add mixnodes to self describing api cache

* Use NodeRole enum

* Add route for described mixnodes

* Cleanup contract_cache

* Remove nodestatuscache

* wait_until_ready impl
  • Loading branch information
durch committed Jul 9, 2024
1 parent 2ec8349 commit 3d0b70a
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 55 deletions.
20 changes: 19 additions & 1 deletion nym-api/nym-api-requests/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0

use crate::nym_nodes::NodeRole;
use crate::pagination::PaginatedResponse;
use cosmwasm_std::{Addr, Coin, Decimal};
use nym_mixnet_contract_common::families::FamilyHead;
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::reward_params::{Performance, RewardingParams};
use nym_mixnet_contract_common::rewarding::RewardEstimate;
use nym_mixnet_contract_common::{
GatewayBond, IdentityKey, Interval, MixId, MixNode, Percent, RewardedSetNodeStatus,
GatewayBond, IdentityKey, Interval, MixId, MixNode, MixNodeBond, Percent, RewardedSetNodeStatus,
};
use nym_node_requests::api::v1::node::models::{AuxiliaryDetails, BinaryBuildInformationOwned};
use schemars::gen::SchemaGenerator;
Expand Down Expand Up @@ -607,6 +608,8 @@ pub struct NymNodeDescription {

// for now we only care about their ws/wss situation, nothing more
pub mixnet_websockets: WebSockets,

pub role: NodeRole,
}

#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)]
Expand All @@ -624,6 +627,21 @@ impl From<GatewayBond> for DescribedGateway {
}
}

#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)]
pub struct DescribedMixNode {
pub bond: MixNodeBond,
pub self_described: Option<NymNodeDescription>,
}

impl From<MixNodeBond> for DescribedMixNode {
fn from(bond: MixNodeBond) -> Self {
DescribedMixNode {
bond,
self_described: None,
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema)]
pub struct NetworkRequesterDetails {
/// address of the embedded network requester
Expand Down
156 changes: 107 additions & 49 deletions nym-api/src/node_describe_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use futures::{stream, StreamExt};
use nym_api_requests::models::{
IpPacketRouterDetails, NetworkRequesterDetails, NymNodeDescription,
};
use nym_api_requests::nym_nodes::NodeRole;
use nym_config::defaults::{mainnet, DEFAULT_NYM_NODE_HTTP_PORT};
use nym_contracts_common::IdentityKey;
use nym_mixnet_contract_common::Gateway;
use nym_node_requests::api::client::{NymNodeApiClientError, NymNodeApiClientExt};
use std::collections::HashMap;
use thiserror::Error;
Expand Down Expand Up @@ -56,8 +56,6 @@ pub enum NodeDescribeCacheError {
}

pub struct NodeDescriptionProvider {
// for now we only care about gateways, nothing more
// network_gateways: SharedCache<Vec<GatewayBond>>,
contract_cache: NymContractCache,

batch_size: usize,
Expand All @@ -79,27 +77,32 @@ impl NodeDescriptionProvider {
}

async fn try_get_client(
gateway: &Gateway,
host: &str,
identity_key: &IdentityKey,
port: Option<u16>,
) -> Result<nym_node_requests::api::Client, NodeDescribeCacheError> {
let gateway_host = &gateway.host;

// first try the standard port in case the operator didn't put the node behind the proxy,
// then default https (443)
// finally default http (80)
let addresses_to_try = vec![
format!("http://{gateway_host}:{DEFAULT_NYM_NODE_HTTP_PORT}"),
format!("https://{gateway_host}"),
format!("http://{gateway_host}"),
let mut addresses_to_try = vec![
format!("http://{host}:{DEFAULT_NYM_NODE_HTTP_PORT}"),
format!("http://{host}:8000"),
format!("https://{host}"),
format!("http://{host}"),
];

if let Some(port) = port {
addresses_to_try.insert(0, format!("http://{host}:{port}"));
}

for address in addresses_to_try {
// if provided host was malformed, no point in continuing
let client = match nym_node_requests::api::Client::new_url(address, None) {
Ok(client) => client,
Err(err) => {
return Err(NodeDescribeCacheError::MalformedHost {
host: gateway_host.clone(),
gateway: gateway.identity_key.clone(),
host: host.to_string(),
gateway: identity_key.clone(),
source: err,
});
}
Expand All @@ -112,28 +115,28 @@ async fn try_get_client(
}

Err(NodeDescribeCacheError::NoHttpPortsAvailable {
host: gateway_host.clone(),
gateway: gateway.identity_key.clone(),
host: host.to_string(),
gateway: identity_key.to_string(),
})
}

async fn get_gateway_description(
gateway: Gateway,
async fn try_get_description(
data: RefreshData,
) -> Result<(IdentityKey, NymNodeDescription), NodeDescribeCacheError> {
let client = try_get_client(&gateway).await?;
let client = try_get_client(&data.host(), &data.identity_key(), data.port()).await?;

let host_info =
client
.get_host_information()
.await
.map_err(|err| NodeDescribeCacheError::ApiFailure {
gateway: gateway.identity_key.clone(),
gateway: data.identity_key().to_string(),
source: err,
})?;

if !host_info.verify_host_information() {
return Err(NodeDescribeCacheError::MissignedHostInformation {
gateway: gateway.identity_key,
gateway: data.identity_key().clone(),
});
}

Expand All @@ -142,29 +145,29 @@ async fn get_gateway_description(
.get_build_information()
.await
.map_err(|err| NodeDescribeCacheError::ApiFailure {
gateway: gateway.identity_key.clone(),
gateway: data.identity_key().clone(),
source: err,
})?;

// this can be an old node that hasn't yet exposed this
let auxiliary_details = client.get_auxiliary_details().await.inspect_err(|err| {
debug!("could not obtain auxiliary details of gateway {}: {err} is it running an old version?", gateway.identity_key);
debug!("could not obtain auxiliary details of node {}: {err} is it running an old version?", data.identity_key());
}).unwrap_or_default();

let websockets =
client
.get_mixnet_websockets()
.await
.map_err(|err| NodeDescribeCacheError::ApiFailure {
gateway: gateway.identity_key.clone(),
gateway: data.identity_key().clone(),
source: err,
})?;

let network_requester =
if let Ok(nr) = client.get_network_requester().await {
let exit_policy = client.get_exit_policy().await.map_err(|err| {
NodeDescribeCacheError::ApiFailure {
gateway: gateway.identity_key.clone(),
gateway: data.identity_key().clone(),
source: err,
}
})?;
Expand Down Expand Up @@ -194,9 +197,44 @@ async fn get_gateway_description(
ip_packet_router,
mixnet_websockets: websockets.into(),
auxiliary_details,
role: data.role(),
};

Ok((gateway.identity_key, description))
Ok((data.identity_key().clone(), description))
}

struct RefreshData {
host: String,
identity_key: IdentityKey,
role: NodeRole,
port: Option<u16>,
}

impl RefreshData {
pub fn new(host: String, identity_key: IdentityKey, role: NodeRole, port: Option<u16>) -> Self {
RefreshData {
host,
identity_key,
role,
port,
}
}

pub fn host(&self) -> String {
self.host.clone()
}

pub fn identity_key(&self) -> IdentityKey {
self.identity_key.clone()
}

pub fn port(&self) -> Option<u16> {
self.port
}

pub fn role(&self) -> NodeRole {
self.role.clone()
}
}

#[async_trait]
Expand All @@ -209,36 +247,56 @@ impl CacheItemProvider for NodeDescriptionProvider {
}

async fn try_refresh(&self) -> Result<Self::Item, Self::Error> {
let gateways = self.contract_cache.gateways_all().await;

// let guard = self.network_gateways.get().await?;
// let gateways = &*guard;
let mut host_id_pairs = self
.contract_cache
.gateways_all()
.await
.into_iter()
.map(|full| {
RefreshData::new(
full.gateway.host,
full.gateway.identity_key,
NodeRole::EntryGateway,
None,
)
})
.collect::<Vec<RefreshData>>();

if gateways.is_empty() {
host_id_pairs.extend(
self.contract_cache
.mixnodes_all()
.await
.into_iter()
.map(|full| {
RefreshData::new(
full.bond_information.mix_node.host,
full.bond_information.mix_node.identity_key,
NodeRole::Mixnode {
layer: full.bond_information.layer.into(),
},
Some(full.bond_information.mix_node.mix_port),
)
})
.collect::<Vec<RefreshData>>(),
);

if host_id_pairs.is_empty() {
return Ok(HashMap::new());
}

// TODO: somehow bypass the 'higher-ranked lifetime error' and remove that redundant clone
let node_description = stream::iter(
gateways
// .deref()
// .clone()
.into_iter()
.map(|bond| bond.gateway)
.map(get_gateway_description),
)
.buffer_unordered(self.batch_size)
.filter_map(|res| async move {
match res {
Ok((identity, description)) => Some((identity, description)),
Err(err) => {
debug!("failed to obtain gateway self-described data: {err}");
None
let node_description = stream::iter(host_id_pairs.into_iter().map(try_get_description))
.buffer_unordered(self.batch_size)
.filter_map(|res| async move {
match res {
Ok((identity, description)) => Some((identity, description)),
Err(err) => {
debug!("failed to obtain gateway self-described data: {err}");
None
}
}
}
})
.collect::<HashMap<_, _>>()
.await;
})
.collect::<HashMap<_, _>>()
.await;

Ok(node_description)
}
Expand Down
2 changes: 1 addition & 1 deletion nym-api/src/nym_nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod unstable_routes;
/// Merges the routes with http information and returns it to Rocket for serving
pub(crate) fn nym_node_routes(settings: &OpenApiSettings) -> (Vec<Route>, OpenApi) {
openapi_get_routes_spec![
settings: routes::get_gateways_described
settings: routes::get_gateways_described, routes::get_mixnodes_described
]
}

Expand Down
38 changes: 37 additions & 1 deletion nym-api/src/nym_nodes/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
use crate::node_describe_cache::DescribedNodes;
use crate::nym_contract_cache::cache::NymContractCache;
use crate::support::caching::cache::SharedCache;
use nym_api_requests::models::DescribedGateway;
use nym_api_requests::models::{DescribedGateway, DescribedMixNode};
use nym_mixnet_contract_common::MixNodeBond;
use rocket::serde::json::Json;
use rocket::State;
use rocket_okapi::openapi;
Expand Down Expand Up @@ -42,3 +43,38 @@ pub async fn get_gateways_described(
.collect(),
)
}

#[openapi(tag = "Nym Nodes")]
#[get("/mixnodes/described")]
pub async fn get_mixnodes_described(
contract_cache: &State<NymContractCache>,
describe_cache: &State<SharedCache<DescribedNodes>>,
) -> Json<Vec<DescribedMixNode>> {
let mixnodes = contract_cache
.mixnodes_filtered()
.await
.into_iter()
.map(|m| m.bond_information)
.collect::<Vec<MixNodeBond>>();
if mixnodes.is_empty() {
return Json(Vec::new());
}

// if the self describe cache is unavailable, well, don't attach describe data
let Ok(self_descriptions) = describe_cache.get().await else {
return Json(mixnodes.into_iter().map(Into::into).collect());
};

// TODO: this is extremely inefficient, but given we don't have many gateways,
// it shouldn't be too much of a problem until we go ahead with directory v3 / the smoosh 2: electric smoosharoo,
// but at that point (I hope) the whole caching situation should get refactored
Json(
mixnodes
.into_iter()
.map(|bond| DescribedMixNode {
self_described: self_descriptions.deref().get(bond.identity()).cloned(),
bond,
})
.collect(),
)
}
Loading

0 comments on commit 3d0b70a

Please sign in to comment.