Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: port collision on large amount of tests #257

Merged
merged 1 commit into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion workspaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ borsh = "0.9"
bs58 = "0.4"
cargo_metadata = { version = "0.14.2", optional = true }
chrono = "0.4.19"
dirs = "3.0.2"
fs2 = "0.4"
hex = "0.4.2"
portpicker = "0.1.1"
rand = "0.8.4"
reqwest = { version = "0.11", features = ["json"] }
serde = "1.0"
serde_json = "1.0"
json-patch = "0.2"
tempfile = "3.3"
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }
tokio-retry = "0.3"
Expand Down
15 changes: 15 additions & 0 deletions workspaces/src/error/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ use crate::result::ExecutionFailure;
use super::{Error, ErrorKind, ErrorRepr, RpcErrorCode, SandboxErrorCode};

impl ErrorKind {
pub(crate) fn full<E, T>(self, msg: T, error: E) -> Error
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
T: Into<Cow<'static, str>>,
{
Error::full(self, msg, error)
}

pub(crate) fn custom<E>(self, error: E) -> Error
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
Expand Down Expand Up @@ -134,6 +142,13 @@ impl SandboxErrorCode {
{
Error::custom(ErrorKind::Sandbox(self), error)
}

pub(crate) fn message<T>(self, msg: T) -> Error
where
T: Into<Cow<'static, str>>,
{
Error::message(ErrorKind::Sandbox(self), msg)
}
}

impl From<SandboxErrorCode> for Error {
Expand Down
5 changes: 3 additions & 2 deletions workspaces/src/network/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::Result;
/// Overwrite the $home_dir/config.json file over a set of entries. `value` will be used per (key, value) pair
/// where value can also be another dict. This recursively sets all entry in `value` dict to the config
/// dict, and saves back into `home_dir` at the end of the day.
fn overwrite(home_dir: &Path, value: Value) -> Result<()> {
fn overwrite(home_dir: impl AsRef<Path>, value: Value) -> Result<()> {
let home_dir = home_dir.as_ref();
let config_file =
File::open(home_dir.join("config.json")).map_err(|err| ErrorKind::Io.custom(err))?;
let config = BufReader::new(config_file);
Expand Down Expand Up @@ -54,7 +55,7 @@ fn max_sandbox_json_payload_size() -> Result<u64> {
}

/// Set extra configs for the sandbox defined by workspaces.
pub(crate) fn set_sandbox_configs(home_dir: &Path) -> Result<()> {
pub(crate) fn set_sandbox_configs(home_dir: impl AsRef<Path>) -> Result<()> {
overwrite(
home_dir,
serde_json::json!({
Expand Down
19 changes: 8 additions & 11 deletions workspaces/src/network/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,22 @@ pub struct Sandbox {
}

impl Sandbox {
pub(crate) fn home_dir(port: u16) -> PathBuf {
let mut path = std::env::temp_dir();
path.push(format!("sandbox-{}", port));
path
}

pub(crate) fn root_signer(&self) -> Result<InMemorySigner> {
let mut path = Self::home_dir(self.server.rpc_port);
path.push("validator_key.json");

let path = self.server.home_dir.path().join("validator_key.json");
InMemorySigner::from_file(&path)
}

pub(crate) async fn new() -> Result<Self> {
let mut server = SandboxServer::default();
server.start().await?;
let mut server = SandboxServer::run_new().await?;
let client = Client::new(&server.rpc_addr());
client.wait_for_rpc().await?;

// Server locks some ports on startup due to potential port collision, so we need
// to unlock the lockfiles after RPC is ready. Not necessarily needed here since
// they get unlocked anyways on the server's drop, but it is nice to clean up the
// lockfiles as soon as possible.
server.unlock_lockfiles()?;

let info = Info {
name: "sandbox".to_string(),
root_id: AccountId::from_str("test.near").unwrap(),
Expand Down
115 changes: 77 additions & 38 deletions workspaces/src/network/server.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,101 @@
use crate::error::SandboxErrorCode;
use crate::network::Sandbox;
use std::fs::File;

use crate::error::{ErrorKind, SandboxErrorCode};
use crate::result::Result;

use async_process::Child;
use fs2::FileExt;
use portpicker::pick_unused_port;
use tempfile::TempDir;
use tracing::info;

use near_sandbox_utils as sandbox;

/// Acquire an unused port and lock it for the duration until the sandbox server has
/// been started.
Comment on lines +14 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was going to suggest the same thing as @DavidM-D, but it seems like this is the key difference. A potential race condition between two nodes seeing that a port is “free” at the same time, then proceeding to simultaneously attempt binding to it. And since there's no global node executor that controls the issuing of ports to nodes, there's no way to gate access to ports if workspaces instances can't communicate with themselves, which is probably why the common denominator used here is the filesystem.

fn acquire_unused_port() -> Result<(u16, File)> {
loop {
let port = pick_unused_port()
.ok_or_else(|| SandboxErrorCode::InitFailure.message("no ports free"))?;
let lockpath = std::env::temp_dir().join(format!("near-sandbox-port{}.lock", port));
let lockfile = File::create(lockpath).map_err(|err| {
ErrorKind::Io.full(format!("failed to create lockfile for port {}", port), err)
})?;
if lockfile.try_lock_exclusive().is_ok() {
Comment on lines +21 to +24
Copy link
Contributor

@miraclx miraclx Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious what create does to a file that currently exists and is locked. The normal behaviour is to truncate existing files. I wonder if it errors at this point and forcing an failed return? Or would it truncate the file, (invalidating the previous lock *unlikely), causing this new lock attempt to fail and then trying a different port or would it skip truncation, yet, return a handle to the file for which we can attempt securing a lock.

I assume you've probably tested this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So lock files are purely advisory about a resource being taken up. They don't actually lock the file from being written to, so truncation wouldn't error out

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha!

break Ok((port, lockfile));
}
}
}

async fn init_home_dir() -> Result<TempDir> {
let home_dir = tempfile::tempdir().map_err(|e| ErrorKind::Io.custom(e))?;
let output = sandbox::init(&home_dir)
.map_err(|e| SandboxErrorCode::InitFailure.custom(e))?
.output()
.await
.map_err(|e| SandboxErrorCode::InitFailure.custom(e))?;
info!(target: "workspaces", "sandbox init: {:?}", output);

Ok(home_dir)
}

pub struct SandboxServer {
pub(crate) rpc_port: u16,
pub(crate) net_port: u16,
pub(crate) home_dir: TempDir,

rpc_port_lock: Option<File>,
net_port_lock: Option<File>,
process: Option<Child>,
}

impl SandboxServer {
pub fn new(rpc_port: u16, net_port: u16) -> Self {
Self {
rpc_port,
net_port,
process: None,
}
}

pub async fn start(&mut self) -> Result<()> {
if self.process.is_some() {
return Err(SandboxErrorCode::AlreadyStarted.into());
}

info!(target: "workspaces", "Starting up sandbox at localhost:{}", self.rpc_port);
let home_dir = Sandbox::home_dir(self.rpc_port);

pub(crate) async fn run_new() -> Result<Self> {
// Supress logs for the sandbox binary by default:
supress_sandbox_logs_if_required();

// Remove dir if it already exists:
let _ = std::fs::remove_dir_all(&home_dir);
let output = sandbox::init(&home_dir)
.map_err(|e| SandboxErrorCode::InitFailure.custom(e))?
.output()
.await
.map_err(|e| SandboxErrorCode::InitFailure.custom(e))?;
info!(target: "workspaces", "sandbox init: {:?}", output);
// Try running the server with the follow provided rpc_ports and net_ports
let (rpc_port, rpc_port_lock) = acquire_unused_port()?;
let (net_port, net_port_lock) = acquire_unused_port()?;
let home_dir = init_home_dir().await?;

// Configure `$home_dir/config.json` to our liking. Sandbox requires extra settings
// for the best user experience, and being able to offer patching large state payloads.
crate::network::config::set_sandbox_configs(&home_dir)?;

let child = sandbox::run(&home_dir, self.rpc_port, self.net_port)
let child = sandbox::run(&home_dir, rpc_port, net_port)
.map_err(|e| SandboxErrorCode::RunFailure.custom(e))?;

info!(target: "workspaces", "Started sandbox: pid={:?}", child.id());
self.process = Some(child);
info!(target: "workspaces", "Started up sandbox at localhost:{} with pid={:?}", rpc_port, child.id());

Ok(Self {
rpc_port,
net_port,
home_dir,
rpc_port_lock: Some(rpc_port_lock),
net_port_lock: Some(net_port_lock),
process: Some(child),
})
}

/// Unlock port lockfiles that were used to avoid port contention when starting up
/// the sandbox node.
pub(crate) fn unlock_lockfiles(&mut self) -> Result<()> {
if let Some(rpc_port_lock) = self.rpc_port_lock.take() {
rpc_port_lock.unlock().map_err(|e| {
ErrorKind::Io.full(
format!("failed to unlock lockfile for rpc_port={}", self.rpc_port),
e,
)
})?;
}
if let Some(net_port_lock) = self.net_port_lock.take() {
net_port_lock.unlock().map_err(|e| {
ErrorKind::Io.full(
format!("failed to unlock lockfile for net_port={}", self.net_port),
e,
)
})?;
}

Ok(())
}
Expand All @@ -61,14 +105,6 @@ impl SandboxServer {
}
}

impl Default for SandboxServer {
fn default() -> Self {
let rpc_port = pick_unused_port().expect("no ports free");
let net_port = pick_unused_port().expect("no ports free");
Self::new(rpc_port, net_port)
}
}

impl Drop for SandboxServer {
fn drop(&mut self) {
if self.process.is_none() {
Expand All @@ -88,6 +124,9 @@ impl Drop for SandboxServer {
.kill()
.map_err(|e| format!("Could not cleanup sandbox due to: {:?}", e))
.unwrap();

// Unlock the ports just in case they have not been preemptively done.
self.unlock_lockfiles().unwrap();
}
}

Expand Down
5 changes: 3 additions & 2 deletions workspaces/src/rpc/patch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ impl<'a, 'b> ImportContractTransaction<'a> {
);
}

// NOTE: For some reason, patching anything with account/contract related items takes two patches
// otherwise its super non-deterministic and mostly just fails to locate the account afterwards: ¯\_(ツ)_/¯
// NOTE: Patching twice here since it takes a while for the first patch to be
// committed to the network. Where the account wouldn't exist until the block
// finality is reached.
self.into_network
.client()
.query(&RpcSandboxPatchStateRequest {
Expand Down