From 838c1068f3a1fc058177cf3df42e90a4d82e2f04 Mon Sep 17 00:00:00 2001 From: MaxVerevkin Date: Wed, 18 Oct 2023 15:48:08 +0300 Subject: [PATCH 1/7] =?UTF-8?q?=E2=AC=86=EF=B8=8F=20zb:=20update=20event-l?= =?UTF-8?q?istener=20to=203.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- book/src/blocking.md | 4 ++-- book/src/server.md | 4 ++-- zbus/Cargo.toml | 2 +- zbus/src/blocking/connection/mod.rs | 16 ++++++++++++---- zbus/src/blocking/object_server.rs | 4 ++-- zbus/src/connection/mod.rs | 2 +- zbus/src/object_server/mod.rs | 2 +- zbus/src/proxy/mod.rs | 2 +- zbus/tests/e2e.rs | 2 ++ 9 files changed, 24 insertions(+), 14 deletions(-) diff --git a/book/src/blocking.md b/book/src/blocking.md index a04e9741d..391bfe8a4 100644 --- a/book/src/blocking.md +++ b/book/src/blocking.md @@ -185,13 +185,13 @@ fn main() -> Result<(), Box> { name: "GreeterName".to_string(), done: event_listener::Event::new(), }; - let done_listener = greeter.done.listen(); + let mut done_listener = greeter.done.listen(); let _handle = connection::Builder::session()? .name("org.zbus.MyGreeter")? .serve_at("/org/zbus/MyGreeter", greeter)? .build()?; - done_listener.wait(); + done_listener.as_mut().wait(); Ok(()) } diff --git a/book/src/server.md b/book/src/server.md index 9319ebd6e..89812722b 100644 --- a/book/src/server.md +++ b/book/src/server.md @@ -262,14 +262,14 @@ async fn main() -> Result<()> { name: "GreeterName".to_string(), done: event_listener::Event::new(), }; - let done_listener = greeter.done.listen(); + let mut done_listener = greeter.done.listen(); let _connection = Builder::session()? .name("org.zbus.MyGreeter")? .serve_at("/org/zbus/MyGreeter", greeter)? .build() .await?; - done_listener.wait(); + done_listener.as_mut().wait(); Ok(()) } diff --git a/zbus/Cargo.toml b/zbus/Cargo.toml index 2b11d4293..1b3866757 100644 --- a/zbus/Cargo.toml +++ b/zbus/Cargo.toml @@ -62,7 +62,7 @@ hex = "0.4.3" ordered-stream = "0.2" rand = "0.8.5" sha1 = { version = "0.10.5", features = ["std"] } -event-listener = "2.5.3" +event-listener = "3.0.0" static_assertions = "1.1.0" async-trait = "0.1.58" async-fs = { version = "1.6.0", optional = true } diff --git a/zbus/src/blocking/connection/mod.rs b/zbus/src/blocking/connection/mod.rs index ff968b055..e344448e4 100644 --- a/zbus/src/blocking/connection/mod.rs +++ b/zbus/src/blocking/connection/mod.rs @@ -313,7 +313,10 @@ mod tests { }); let c = Builder::unix_stream(p1).p2p().build().unwrap(); - let listener = c.monitor_activity(); + + let mut listener = Box::pin(c.monitor_activity()); + listener.as_mut().listen(); + let mut s = MessageIterator::from(&c); tx.send(()).unwrap(); let m = s.next().unwrap().unwrap(); @@ -326,11 +329,16 @@ mod tests { assert_eq!(val, "yay"); // there was some activity - listener.wait(); + listener.as_mut().wait(); // eventually, nothing happens and it will timeout loop { - let listener = c.monitor_activity(); - if !listener.wait_timeout(std::time::Duration::from_millis(10)) { + let mut listener = Box::pin(c.monitor_activity()); + listener.as_mut().listen(); + if listener + .as_mut() + .wait_timeout(std::time::Duration::from_millis(10)) + .is_none() + { break; } } diff --git a/zbus/src/blocking/object_server.rs b/zbus/src/blocking/object_server.rs index 74657c4de..03d20511f 100644 --- a/zbus/src/blocking/object_server.rs +++ b/zbus/src/blocking/object_server.rs @@ -115,13 +115,13 @@ where /// let connection = Connection::session()?; /// /// let quit_event = Event::new(); -/// let quit_listener = quit_event.listen(); +/// let mut quit_listener = quit_event.listen(); /// let interface = Example::new(quit_event); /// connection /// .object_server() /// .at("/org/zbus/path", interface)?; /// -/// quit_listener.wait(); +/// quit_listener.as_mut().wait(); /// # Ok::<_, Box>(()) /// ``` #[derive(Debug)] diff --git a/zbus/src/connection/mod.rs b/zbus/src/connection/mod.rs index e28007270..54dbb558d 100644 --- a/zbus/src/connection/mod.rs +++ b/zbus/src/connection/mod.rs @@ -1213,7 +1213,7 @@ impl Connection { /// /// This function is meant for the caller to implement idle or timeout on inactivity. pub fn monitor_activity(&self) -> EventListener { - self.inner.activity_event.listen() + EventListener::new(&self.inner.activity_event) } /// Returns the peer credentials. diff --git a/zbus/src/object_server/mod.rs b/zbus/src/object_server/mod.rs index cbd3dc03c..1c674f149 100644 --- a/zbus/src/object_server/mod.rs +++ b/zbus/src/object_server/mod.rs @@ -811,7 +811,7 @@ impl ResponseDispatchNotifier { /// Create a new `NotifyResponse`. pub fn new(response: R) -> (Self, EventListener) { let event = Event::new(); - let listener = event.listen(); + let listener = EventListener::new(&event); ( Self { response, diff --git a/zbus/src/proxy/mod.rs b/zbus/src/proxy/mod.rs index 53f9445f3..ea88c2aee 100644 --- a/zbus/src/proxy/mod.rs +++ b/zbus/src/proxy/mod.rs @@ -209,7 +209,7 @@ where pub struct PropertyStream<'a, T> { name: &'a str, proxy: Proxy<'a>, - changed_listener: EventListener, + changed_listener: Pin>, phantom: std::marker::PhantomData, } diff --git a/zbus/tests/e2e.rs b/zbus/tests/e2e.rs index faf74339d..32d3a8fc8 100644 --- a/zbus/tests/e2e.rs +++ b/zbus/tests/e2e.rs @@ -232,6 +232,8 @@ impl MyIfaceImpl { ) -> zbus::fdo::Result> { debug!("`TestResponseNotify` called."); let (response, listener) = ResponseDispatchNotifier::new(String::from("Meaning of life")); + let mut listener = Box::pin(listener); + listener.as_mut().listen(); let ctxt = ctxt.to_owned(); conn.executor() .spawn( From fac1c34cd32c8eb269ad6a1f25468401d0da5954 Mon Sep 17 00:00:00 2001 From: MaxVerevkin Date: Wed, 18 Oct 2023 15:50:29 +0300 Subject: [PATCH 2/7] =?UTF-8?q?=E2=AC=86=EF=B8=8F=20zb:=20update=20async-b?= =?UTF-8?q?roadcast=20to=200.6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zbus/Cargo.toml | 2 +- zbus/src/connection/socket_reader.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/zbus/Cargo.toml b/zbus/Cargo.toml index 1b3866757..5c2413a96 100644 --- a/zbus/Cargo.toml +++ b/zbus/Cargo.toml @@ -54,7 +54,7 @@ futures-util = { version = "0.3.25", default-features = false, features = [ "std", ] } async-lock = { version = "2.6.0", optional = true } -async-broadcast = "0.5.0" +async-broadcast = "0.6.0" async-executor = { version = "1.5.0", optional = true } blocking = { version = "1.0.2", optional = true } async-task = { version = "4.3.0", optional = true } diff --git a/zbus/src/connection/socket_reader.rs b/zbus/src/connection/socket_reader.rs index 8e0f2e646..e468f19aa 100644 --- a/zbus/src/connection/socket_reader.rs +++ b/zbus/src/connection/socket_reader.rs @@ -69,7 +69,7 @@ impl SocketReader { } } - if let Err(e) = sender.broadcast(msg.clone()).await { + if let Err(e) = sender.broadcast_direct(msg.clone()).await { // An error would be due to either of these: // // 1. the channel is closed. From be62e89d0c872c8393157062d971b285d08c266d Mon Sep 17 00:00:00 2001 From: MaxVerevkin Date: Wed, 18 Oct 2023 16:02:35 +0300 Subject: [PATCH 3/7] =?UTF-8?q?=E2=AC=86=EF=B8=8F=20zb:=20update=20async-f?= =?UTF-8?q?s=20to=202.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zbus/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zbus/Cargo.toml b/zbus/Cargo.toml index 5c2413a96..5faa19cec 100644 --- a/zbus/Cargo.toml +++ b/zbus/Cargo.toml @@ -65,7 +65,7 @@ sha1 = { version = "0.10.5", features = ["std"] } event-listener = "3.0.0" static_assertions = "1.1.0" async-trait = "0.1.58" -async-fs = { version = "1.6.0", optional = true } +async-fs = { version = "2.0.0", optional = true } # FIXME: We should only enable process feature for Mac OS. See comment on async-process below for why we can't. tokio = { version = "1.21.2", optional = true, features = [ "rt", From 455c8020f878251d09d547e2be777b8b98aa4232 Mon Sep 17 00:00:00 2001 From: MaxVerevkin Date: Fri, 10 Nov 2023 10:16:36 +0200 Subject: [PATCH 4/7] =?UTF-8?q?=E2=AC=86=EF=B8=8F=20zb:=20update=20async-l?= =?UTF-8?q?ock=20to=203.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zbus/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zbus/Cargo.toml b/zbus/Cargo.toml index 5faa19cec..a29d8048f 100644 --- a/zbus/Cargo.toml +++ b/zbus/Cargo.toml @@ -53,7 +53,7 @@ futures-util = { version = "0.3.25", default-features = false, features = [ "sink", "std", ] } -async-lock = { version = "2.6.0", optional = true } +async-lock = { version = "3.0.0", optional = true } async-broadcast = "0.6.0" async-executor = { version = "1.5.0", optional = true } blocking = { version = "1.0.2", optional = true } From 5e726aafdf784e9d9f13baa25d2f349b1bec36ec Mon Sep 17 00:00:00 2001 From: MaxVerevkin Date: Fri, 10 Nov 2023 10:25:48 +0200 Subject: [PATCH 5/7] =?UTF-8?q?=E2=AC=86=EF=B8=8F=20zb:=20update=20async-i?= =?UTF-8?q?o=20to=202.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zbus/Cargo.toml | 2 +- zbus/src/address.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/zbus/Cargo.toml b/zbus/Cargo.toml index a29d8048f..0e48b336b 100644 --- a/zbus/Cargo.toml +++ b/zbus/Cargo.toml @@ -46,7 +46,7 @@ zbus_macros = { path = "../zbus_macros", version = "=4.0.0" } enumflags2 = { version = "0.7.7", features = ["serde"] } derivative = "2.2" once_cell = "1.4.0" -async-io = { version = "1.12.0", optional = true } +async-io = { version = "2.0.0", optional = true } futures-core = "0.3.25" futures-sink = "0.3.25" futures-util = { version = "0.3.25", default-features = false, features = [ diff --git a/zbus/src/address.rs b/zbus/src/address.rs index 27efe6c37..c4558c083 100644 --- a/zbus/src/address.rs +++ b/zbus/src/address.rs @@ -333,6 +333,7 @@ impl Address { Address::Tcp(addr) => connect_tcp(addr).await.map(Stream::Tcp), Address::NonceTcp { addr, nonce_file } => { + #[allow(unused_mut)] let mut stream = connect_tcp(addr).await?; #[cfg(unix)] @@ -352,7 +353,7 @@ impl Address { while !nonce.is_empty() { let len = stream - .write_with_mut(|s| std::io::Write::write(s, nonce)) + .write_with(|mut s| std::io::Write::write(&mut s, nonce)) .await?; nonce = &nonce[len..]; } From fe2ee2dbf9021ae35c15b302e6c46307a7b35135 Mon Sep 17 00:00:00 2001 From: MaxVerevkin Date: Fri, 10 Nov 2023 10:27:01 +0200 Subject: [PATCH 6/7] =?UTF-8?q?=E2=AC=86=EF=B8=8F=20zb:=20update=20async-p?= =?UTF-8?q?rocess=20to=202.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zbus/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zbus/Cargo.toml b/zbus/Cargo.toml index 0e48b336b..e4092b830 100644 --- a/zbus/Cargo.toml +++ b/zbus/Cargo.toml @@ -108,7 +108,7 @@ nix = { version = "0.27", default-features = false, features = [ [target.'cfg(target_os = "macos")'.dependencies] # FIXME: This should only be enabled if async-io feature is enabled but currently # Cargo doesn't provide a way to do that for only specific target OS: https://github.com/rust-lang/cargo/issues/1197. -async-process = "1.7.0" +async-process = "2.0.0" [target.'cfg(any(target_os = "macos", windows))'.dependencies] async-recursion = "1.0.0" From 6010611d04ae0b7afe9311e92fd9e12e79410ef1 Mon Sep 17 00:00:00 2001 From: MaxVerevkin Date: Sat, 25 Nov 2023 10:28:18 +0200 Subject: [PATCH 7/7] =?UTF-8?q?=E2=AC=86=EF=B8=8F=20zb:=20update=20event-l?= =?UTF-8?q?istener=20to=20v4.0.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zbus/Cargo.toml | 2 +- zbus/src/blocking/connection/mod.rs | 10 ++++------ zbus/src/connection/mod.rs | 4 ++-- zbus/src/object_server/mod.rs | 5 +++-- zbus/tests/e2e.rs | 2 -- 5 files changed, 10 insertions(+), 13 deletions(-) diff --git a/zbus/Cargo.toml b/zbus/Cargo.toml index e4092b830..8a27b8930 100644 --- a/zbus/Cargo.toml +++ b/zbus/Cargo.toml @@ -62,7 +62,7 @@ hex = "0.4.3" ordered-stream = "0.2" rand = "0.8.5" sha1 = { version = "0.10.5", features = ["std"] } -event-listener = "3.0.0" +event-listener = "4.0.0" static_assertions = "1.1.0" async-trait = "0.1.58" async-fs = { version = "2.0.0", optional = true } diff --git a/zbus/src/blocking/connection/mod.rs b/zbus/src/blocking/connection/mod.rs index e344448e4..203070a79 100644 --- a/zbus/src/blocking/connection/mod.rs +++ b/zbus/src/blocking/connection/mod.rs @@ -3,7 +3,7 @@ use enumflags2::BitFlags; use event_listener::EventListener; use static_assertions::assert_impl_all; -use std::{io, ops::Deref}; +use std::{io, ops::Deref, pin::Pin}; use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName}; use zvariant::ObjectPath; @@ -241,7 +241,7 @@ impl Connection { /// Returns a listener, notified on various connection activity. /// /// This function is meant for the caller to implement idle or timeout on inactivity. - pub fn monitor_activity(&self) -> EventListener { + pub fn monitor_activity(&self) -> Pin> { self.inner.monitor_activity() } @@ -314,8 +314,7 @@ mod tests { let c = Builder::unix_stream(p1).p2p().build().unwrap(); - let mut listener = Box::pin(c.monitor_activity()); - listener.as_mut().listen(); + let mut listener = c.monitor_activity(); let mut s = MessageIterator::from(&c); tx.send(()).unwrap(); @@ -332,8 +331,7 @@ mod tests { listener.as_mut().wait(); // eventually, nothing happens and it will timeout loop { - let mut listener = Box::pin(c.monitor_activity()); - listener.as_mut().listen(); + let mut listener = c.monitor_activity(); if listener .as_mut() .wait_timeout(std::time::Duration::from_millis(10)) diff --git a/zbus/src/connection/mod.rs b/zbus/src/connection/mod.rs index 54dbb558d..d2ee12269 100644 --- a/zbus/src/connection/mod.rs +++ b/zbus/src/connection/mod.rs @@ -1212,8 +1212,8 @@ impl Connection { /// Returns a listener, notified on various connection activity. /// /// This function is meant for the caller to implement idle or timeout on inactivity. - pub fn monitor_activity(&self) -> EventListener { - EventListener::new(&self.inner.activity_event) + pub fn monitor_activity(&self) -> Pin> { + self.inner.activity_event.listen() } /// Returns the peer credentials. diff --git a/zbus/src/object_server/mod.rs b/zbus/src/object_server/mod.rs index 1c674f149..c24c46246 100644 --- a/zbus/src/object_server/mod.rs +++ b/zbus/src/object_server/mod.rs @@ -7,6 +7,7 @@ use std::{ fmt::Write, marker::PhantomData, ops::{Deref, DerefMut}, + pin::Pin, sync::Arc, }; use tracing::{debug, instrument, trace}; @@ -809,9 +810,9 @@ pub struct ResponseDispatchNotifier { impl ResponseDispatchNotifier { /// Create a new `NotifyResponse`. - pub fn new(response: R) -> (Self, EventListener) { + pub fn new(response: R) -> (Self, Pin>) { let event = Event::new(); - let listener = EventListener::new(&event); + let listener = event.listen(); ( Self { response, diff --git a/zbus/tests/e2e.rs b/zbus/tests/e2e.rs index 32d3a8fc8..faf74339d 100644 --- a/zbus/tests/e2e.rs +++ b/zbus/tests/e2e.rs @@ -232,8 +232,6 @@ impl MyIfaceImpl { ) -> zbus::fdo::Result> { debug!("`TestResponseNotify` called."); let (response, listener) = ResponseDispatchNotifier::new(String::from("Meaning of life")); - let mut listener = Box::pin(listener); - listener.as_mut().listen(); let ctxt = ctxt.to_owned(); conn.executor() .spawn(