Skip to content

Commit

Permalink
Merge pull request #402 from zeenix/response-notify
Browse files Browse the repository at this point in the history
🏷️ zb: Add ResponseDispatchNotifier type
  • Loading branch information
zeenix authored Jul 9, 2023
2 parents 63557f0 + b8d6798 commit 79ad20b
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 4 deletions.
3 changes: 2 additions & 1 deletion zbus/src/abstractions/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ pub struct Task<T>(Option<JoinHandle<T>>);
impl<T> Task<T> {
/// Detaches the task to let it keep running in the background.
#[allow(unused_mut)]
pub(crate) fn detach(mut self) {
#[allow(unused)]
pub fn detach(mut self) {
#[cfg(not(feature = "tokio"))]
{
self.0.take().expect("async_task::Task is none").detach()
Expand Down
71 changes: 70 additions & 1 deletion zbus/src/object_server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use event_listener::{Event, EventListener};
use serde::Serialize;
use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
Expand All @@ -10,7 +12,7 @@ use tracing::{debug, instrument, trace};

use static_assertions::assert_impl_all;
use zbus_names::InterfaceName;
use zvariant::{ObjectPath, OwnedObjectPath, OwnedValue, Value};
use zvariant::{ObjectPath, OwnedObjectPath, OwnedValue, Signature, Type, Value};

use crate::{
async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard},
Expand Down Expand Up @@ -740,3 +742,70 @@ impl From<crate::blocking::ObjectServer> for ObjectServer {
server.into_inner()
}
}

/// A response wrapper that notifies after response has been sent.
///
/// Sometimes in [`dbus_interface`] method implemenations we need to do some other work after the
/// response has been sent off. This wrapper type allows us to do that. Instead of returning your
/// intended response type directly, wrap it in this type and return it from your method. The
/// returned `EventListener` from `new` method will be notified when the response has been sent.
///
/// A typical use case is sending off signals after the response has been sent. The easiest way to
/// do that is to spawn a task from the method that sends the signal but only after being notified
/// of the response dispatch.
///
/// # Caveats
///
/// The notification indicates that the response has been sent off, not that destination peer has
/// received it. That can only be guaranteed for a peer-to-peer connection.
///
/// [`dbus_interface`]: crate::dbus_interface
#[derive(Debug)]
pub struct ResponseDispatchNotifier<R> {
response: R,
event: Option<Event>,
}

impl<R> ResponseDispatchNotifier<R> {
/// Create a new `NotifyResponse`.
pub fn new(response: R) -> (Self, EventListener) {
let event = Event::new();
let listener = event.listen();
(
Self {
response,
event: Some(event),
},
listener,
)
}
}

impl<R> Serialize for ResponseDispatchNotifier<R>
where
R: Serialize,
{
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.response.serialize(serializer)
}
}

impl<R> Type for ResponseDispatchNotifier<R>
where
R: Type,
{
fn signature() -> Signature<'static> {
R::signature()
}
}

impl<T> Drop for ResponseDispatchNotifier<T> {
fn drop(&mut self) {
if let Some(event) = self.event.take() {
event.notify(usize::MAX);
}
}
}
59 changes: 57 additions & 2 deletions zbus/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{collections::HashMap, convert::TryInto};
use tokio::net::UnixStream;

use event_listener::Event;
use futures_util::StreamExt;
use futures_util::{StreamExt, TryStreamExt};
use ntest::timeout;
use serde::{Deserialize, Serialize};
use test_log::test;
Expand All @@ -15,7 +15,7 @@ use tracing::{debug, instrument};
use zbus::{
block_on,
fdo::{ObjectManager, ObjectManagerProxy},
DBusError,
DBusError, MessageBuilder, MessageStream, ResponseDispatchNotifier,
};
use zvariant::{DeserializeDict, OwnedValue, SerializeDict, Str, Type, Value};

Expand Down Expand Up @@ -62,6 +62,8 @@ trait MyIface {

fn test_multi_ret(&self) -> zbus::Result<(i32, String)>;

fn test_response_notify(&self) -> zbus::Result<String>;

fn test_hashmap_return(&self) -> zbus::Result<HashMap<String, String>>;

fn create_obj(&self, key: &str) -> zbus::Result<()>;
Expand Down Expand Up @@ -210,6 +212,32 @@ impl MyIfaceImpl {
Ok((42, String::from("Meaning of life")))
}

#[instrument]
fn test_response_notify(
&self,
#[zbus(connection)] conn: &Connection,
#[zbus(signal_context)] ctxt: SignalContext<'_>,
) -> zbus::fdo::Result<ResponseDispatchNotifier<String>> {
debug!("`TestResponseNotify` called.");
let (response, listener) = ResponseDispatchNotifier::new(String::from("Meaning of life"));
let ctxt = ctxt.to_owned();
conn.executor()
.spawn(
async move {
listener.await;

Self::test_response_notified(ctxt).await.unwrap();
},
"TestResponseNotify",
)
.detach();

Ok(response)
}

#[dbus_interface(signal)]
async fn test_response_notified(ctxt: SignalContext<'_>) -> zbus::Result<()>;

#[instrument]
async fn test_hashmap_return(&self) -> zbus::fdo::Result<HashMap<String, String>> {
debug!("`TestHashmapReturn` called.");
Expand Down Expand Up @@ -400,6 +428,33 @@ fn check_ipv4_address(address: IP4Adress) {
#[instrument]
async fn my_iface_test(conn: Connection, event: Event) -> zbus::Result<u32> {
debug!("client side starting..");
// Use low-level API for `TestResponseNotify` because we need to ensure that the signal is
// always received after the response.
let mut stream = MessageStream::from(&conn);
let method = MessageBuilder::method_call("/org/freedesktop/MyService", "TestResponseNotify")?
.interface("org.freedesktop.MyIface")?
.destination("org.freedesktop.MyService")?
.build(&())?;
let serial = conn.send_message(method).await?;
let mut method_returned = false;
let mut signal_received = false;
while !method_returned && !signal_received {
let msg = stream.try_next().await?.unwrap();

let hdr = msg.header()?;
if hdr.message_type()? == MessageType::MethodReturn && hdr.reply_serial()? == Some(serial) {
assert!(!signal_received);
method_returned = true;
} else if hdr.message_type()? == MessageType::Signal
&& hdr.interface()?.unwrap() == "org.freedesktop.MyService"
&& hdr.member()?.unwrap() == "TestResponseNotified"
{
assert!(method_returned);
signal_received = true;
}
}
drop(stream);

let proxy = MyIfaceProxy::builder(&conn)
.destination("org.freedesktop.MyService")?
.path("/org/freedesktop/MyService")?
Expand Down

0 comments on commit 79ad20b

Please sign in to comment.