Skip to content

Commit 9ddded9

Browse files
committed
Expire subscriptions after TTL
1 parent b25c03c commit 9ddded9

File tree

5 files changed

+154
-4
lines changed

5 files changed

+154
-4
lines changed

crates/core/src/sync/interface.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::schema::Schema;
1717
use crate::state::DatabaseState;
1818
use crate::sync::subscriptions::{apply_subscriptions, SubscriptionChangeRequest};
1919
use crate::sync::BucketPriority;
20+
use crate::util::JsonString;
2021

2122
use super::streaming_sync::SyncClient;
2223
use super::sync_status::DownloadSyncStatus;
@@ -142,7 +143,7 @@ pub struct RequestedStreamSubscription {
142143
/// The name of the sync stream to subscribe to.
143144
pub stream: String,
144145
/// Parameters to make available in the stream's definition.
145-
pub parameters: Box<serde_json::value::RawValue>,
146+
pub parameters: Option<Box<JsonString>>,
146147
pub override_priority: Option<BucketPriority>,
147148
#[serde_as(as = "DisplayFromStr")]
148149
pub client_id: i64,

crates/core/src/sync/storage_adapter.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};
88
use crate::{
99
error::{PSResult, PowerSyncError},
1010
ext::SafeManagedStmt,
11+
kv::client_id,
1112
operations::delete_bucket,
1213
schema::Schema,
1314
state::DatabaseState,
@@ -36,6 +37,7 @@ pub struct StorageAdapter {
3637
pub progress_stmt: ManagedStmt,
3738
time_stmt: ManagedStmt,
3839
delete_subscription: ManagedStmt,
40+
update_subscription: ManagedStmt,
3941
}
4042

4143
impl StorageAdapter {
@@ -52,11 +54,16 @@ impl StorageAdapter {
5254
let delete_subscription =
5355
db.prepare_v2("DELETE FROM ps_stream_subscriptions WHERE id = ?")?;
5456

57+
// language=SQLite
58+
let update_subscription =
59+
db.prepare_v2("UPDATE ps_stream_subscriptions SET active = ?2, is_default = ?3, ttl = ?, expires_at = ?, last_synced_at = ? WHERE id = ?1")?;
60+
5561
Ok(Self {
5662
db,
5763
progress_stmt: progress,
5864
time_stmt: time,
5965
delete_subscription,
66+
update_subscription,
6067
})
6168
}
6269

@@ -256,7 +263,23 @@ impl StorageAdapter {
256263
&self,
257264
include_defaults: bool,
258265
) -> Result<StreamSubscriptionRequest, PowerSyncError> {
266+
self.delete_outdated_subscriptions()?;
267+
259268
let mut subscriptions: Vec<RequestedStreamSubscription> = Vec::new();
269+
let stmt = self
270+
.db
271+
.prepare_v2("SELECT * FROM ps_stream_subscriptions WHERE NOT is_default;")?;
272+
273+
while let ResultCode::ROW = stmt.step()? {
274+
let subscription = Self::read_stream_subscription(&stmt)?;
275+
276+
subscriptions.push(RequestedStreamSubscription {
277+
stream: subscription.stream_name,
278+
parameters: subscription.local_params,
279+
override_priority: subscription.local_priority,
280+
client_id: subscription.id,
281+
});
282+
}
260283

261284
Ok(StreamSubscriptionRequest {
262285
include_defaults,
@@ -296,6 +319,12 @@ impl StorageAdapter {
296319
})
297320
}
298321

322+
fn delete_outdated_subscriptions(&self) -> Result<(), PowerSyncError> {
323+
self.db
324+
.exec_safe("DELETE FROM ps_stream_subscriptions WHERE expires_at < unixepoch()")?;
325+
Ok(())
326+
}
327+
299328
pub fn iterate_local_subscriptions<F: FnMut(LocallyTrackedSubscription) -> ()>(
300329
&self,
301330
mut action: F,
@@ -324,6 +353,39 @@ impl StorageAdapter {
324353
}
325354
}
326355

356+
pub fn update_subscription(
357+
&self,
358+
subscription: &LocallyTrackedSubscription,
359+
) -> Result<(), PowerSyncError> {
360+
let _ = self.update_subscription.reset();
361+
362+
self.update_subscription.bind_int64(1, subscription.id)?;
363+
self.update_subscription
364+
.bind_int(2, if subscription.active { 1 } else { 0 })?;
365+
self.update_subscription
366+
.bind_int(3, if subscription.is_default { 1 } else { 0 })?;
367+
if let Some(ttl) = subscription.ttl {
368+
self.update_subscription.bind_int64(4, ttl)?;
369+
} else {
370+
self.update_subscription.bind_null(4)?;
371+
}
372+
373+
if let Some(expires_at) = subscription.expires_at {
374+
self.update_subscription.bind_int64(5, expires_at)?;
375+
} else {
376+
self.update_subscription.bind_null(5)?;
377+
}
378+
379+
if let Some(last_synced_at) = subscription.last_synced_at {
380+
self.update_subscription.bind_int64(6, last_synced_at)?;
381+
} else {
382+
self.update_subscription.bind_null(6)?;
383+
}
384+
385+
self.update_subscription.exec()?;
386+
Ok(())
387+
}
388+
327389
pub fn delete_subscription(&self, id: i64) -> Result<(), PowerSyncError> {
328390
let _ = self.delete_subscription.reset();
329391
self.delete_subscription.bind_int64(1, id)?;

crates/core/src/sync/streaming_sync.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,7 @@ impl StreamingSyncIteration {
571571
tracked: &TrackedCheckpoint,
572572
) -> Result<Vec<ActiveStreamSubscription>, PowerSyncError> {
573573
let mut tracked_subscriptions: Vec<LocallyTrackedSubscription> = Vec::new();
574+
let now = self.adapter.now()?;
574575

575576
// Load known subscriptions from database
576577
self.adapter.iterate_local_subscriptions(|mut sub| {
@@ -588,9 +589,14 @@ impl StreamingSyncIteration {
588589
.filter(|s| s.stream_name == subscription.name);
589590

590591
let mut has_local = false;
591-
for subscription in matching_local_subscriptions {
592-
subscription.active = true;
592+
for local in matching_local_subscriptions {
593+
local.active = true;
594+
local.is_default = subscription.is_default;
593595
has_local = true;
596+
597+
if let Some(ttl) = local.ttl {
598+
local.expires_at = Some(now.0 + ttl);
599+
}
594600
}
595601

596602
if !has_local && subscription.is_default {
@@ -601,8 +607,10 @@ impl StreamingSyncIteration {
601607

602608
// Clean up default subscriptions that are no longer active.
603609
for subscription in &tracked_subscriptions {
604-
if subscription.is_default && !subscription.active {
610+
if !subscription.has_subscribed_manually() && !subscription.active {
605611
self.adapter.delete_subscription(subscription.id)?;
612+
} else {
613+
self.adapter.update_subscription(subscription)?;
606614
}
607615
}
608616
tracked_subscriptions

crates/core/src/sync/subscriptions.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,19 @@ pub struct LocallyTrackedSubscription {
3333
}
3434

3535
impl LocallyTrackedSubscription {
36+
/// The default TTL of non-default subscriptions if none is set: One day.
37+
pub const DEFAULT_TTL: i64 = 60 * 60 * 24;
38+
3639
pub fn key(&self) -> SubscriptionKey {
3740
SubscriptionKey {
3841
stream_name: self.stream_name.clone(),
3942
params: self.local_params.clone(),
4043
}
4144
}
45+
46+
pub fn has_subscribed_manually(&self) -> bool {
47+
self.ttl.is_some()
48+
}
4249
}
4350

4451
/// A request sent from a PowerSync SDK to alter the subscriptions managed by this client.

dart/test/sync_stream_test.dart

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,5 +175,77 @@ void main() {
175175
expect(stored, containsPair('active', 1));
176176
expect(stored, containsPair('is_default', 0));
177177
});
178+
179+
syncTest('ttl', (controller) {
180+
db.execute(
181+
'INSERT INTO ps_stream_subscriptions (stream_name, ttl) VALUES (?, ?);',
182+
['my_stream', 3600]);
183+
184+
var startInstructions = control('start', null);
185+
expect(
186+
startInstructions,
187+
contains(
188+
containsPair(
189+
'EstablishSyncStream',
190+
containsPair(
191+
'request',
192+
containsPair(
193+
'streams',
194+
{
195+
'include_defaults': true,
196+
'subscriptions': [
197+
{
198+
'stream': 'my_stream',
199+
'parameters': null,
200+
'override_priority': null,
201+
'client_id': '1',
202+
}
203+
],
204+
},
205+
),
206+
),
207+
),
208+
),
209+
);
210+
211+
// Send a checkpoint containing the stream, increasing the TTL.
212+
control(
213+
'line_text',
214+
json.encode(
215+
checkpoint(
216+
lastOpId: 1,
217+
buckets: [],
218+
streams: [('my_stream', false)],
219+
),
220+
),
221+
);
222+
223+
final [row] = db.select('SELECT * FROM ps_stream_subscriptions');
224+
expect(row, containsPair('expires_at', 1740826800));
225+
control('stop', null);
226+
227+
// Elapse beyond end of TTL
228+
controller.elapse(const Duration(hours: 2));
229+
startInstructions = control('start', null);
230+
expect(
231+
startInstructions,
232+
contains(
233+
containsPair(
234+
'EstablishSyncStream',
235+
containsPair(
236+
'request',
237+
containsPair(
238+
'streams',
239+
{
240+
'include_defaults': true,
241+
// Outdated subscription should no longer be included.
242+
'subscriptions': isEmpty,
243+
},
244+
),
245+
),
246+
),
247+
),
248+
);
249+
});
178250
});
179251
}

0 commit comments

Comments
 (0)