Skip to content

Commit f7d4df3

Browse files
committed
Update last_synced_at for subscriptions
1 parent 21f4fc7 commit f7d4df3

File tree

5 files changed

+103
-35
lines changed

5 files changed

+103
-35
lines changed

crates/core/src/sync/streaming_sync.rs

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
BucketPriority,
3434
},
3535
};
36-
use sqlite_nostd::{self as sqlite};
36+
use sqlite_nostd::{self as sqlite, Connection, ResultCode};
3737

3838
use super::{
3939
interface::{Instruction, LogSeverity, StreamingSyncRequest, SyncControlRequest, SyncEvent},
@@ -309,9 +309,7 @@ impl StreamingSyncIteration {
309309
));
310310
};
311311
let target = &checkpoint.checkpoint;
312-
let result =
313-
self.adapter
314-
.sync_local(&self.state, target, None, &self.options.schema)?;
312+
let result = self.sync_local(target, None)?;
315313

316314
match result {
317315
SyncLocalResult::ChecksumFailure(checkpoint_result) => {
@@ -354,12 +352,7 @@ impl StreamingSyncIteration {
354352
"Received checkpoint complete without previous checkpoint",
355353
));
356354
};
357-
let result = self.adapter.sync_local(
358-
&self.state,
359-
&target.checkpoint,
360-
Some(priority),
361-
&self.options.schema,
362-
)?;
355+
let result = self.sync_local(&target.checkpoint, Some(priority))?;
363356

364357
match result {
365358
SyncLocalResult::ChecksumFailure(checkpoint_result) => {
@@ -505,12 +498,7 @@ impl StreamingSyncIteration {
505498
.map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?,
506499
SyncEvent::UploadFinished => {
507500
if let Some(checkpoint) = self.validated_but_not_applied.take() {
508-
let result = self.adapter.sync_local(
509-
&self.state,
510-
&checkpoint,
511-
None,
512-
&self.options.schema,
513-
)?;
501+
let result = self.sync_local(&checkpoint, None)?;
514502

515503
match result {
516504
SyncLocalResult::ChangesApplied => {
@@ -631,17 +619,13 @@ impl StreamingSyncIteration {
631619
if let Ok(index) =
632620
tracked_subscriptions.binary_search_by_key(subscription_id, |s| s.id)
633621
{
634-
resolved[index]
635-
.associated_buckets
636-
.push(bucket.bucket.clone());
622+
resolved[index].mark_associated_with_bucket(&bucket);
637623
}
638624
}
639625
}
640626
BucketSubscriptionReason::IsDefault { stream_name } => {
641627
if let Some(index) = default_stream_subscriptions.get(stream_name.as_str()) {
642-
resolved[*index]
643-
.associated_buckets
644-
.push(bucket.bucket.clone());
628+
resolved[*index].mark_associated_with_bucket(&bucket);
645629
}
646630
}
647631
BucketSubscriptionReason::Unknown => {}
@@ -651,6 +635,41 @@ impl StreamingSyncIteration {
651635
Ok(resolved)
652636
}
653637

638+
/// Performs a partial or a complete local sync.
639+
fn sync_local(
640+
&self,
641+
target: &OwnedCheckpoint,
642+
priority: Option<BucketPriority>,
643+
) -> Result<SyncLocalResult, PowerSyncError> {
644+
let result =
645+
self.adapter
646+
.sync_local(&self.state, target, priority, &self.options.schema)?;
647+
648+
if matches!(&result, SyncLocalResult::ChangesApplied) {
649+
// Update affected stream subscriptions to mark them as synced.
650+
let mut status = self.status.inner().borrow_mut();
651+
if let Some(ref mut streams) = status.streams {
652+
let stmt = self.adapter.db.prepare_v2(
653+
"UPDATE ps_stream_subscriptions SET last_synced_at = unixepoch() WHERE id = ? RETURNING last_synced_at",
654+
)?;
655+
656+
for stream in streams {
657+
if stream.is_in_priority(priority) {
658+
stmt.bind_int64(1, stream.id)?;
659+
if stmt.step()? == ResultCode::ROW {
660+
let timestamp = Timestamp(stmt.column_int64(0));
661+
stream.last_synced_at = Some(timestamp);
662+
}
663+
664+
stmt.reset()?;
665+
}
666+
}
667+
}
668+
}
669+
670+
Ok(result)
671+
}
672+
654673
/// Prepares a sync iteration by handling the initial [SyncEvent::Initialize].
655674
///
656675
/// This prepares a [StreamingSyncRequest] by fetching local sync state and the requested bucket

crates/core/src/sync/sync_status.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
use alloc::{boxed::Box, collections::btree_map::BTreeMap, rc::Rc, string::String, vec::Vec};
22
use core::{
33
cell::RefCell,
4+
cmp::min,
45
hash::{BuildHasher, Hash},
56
};
67
use rustc_hash::FxBuildHasher;
78
use serde::Serialize;
89
use sqlite_nostd::ResultCode;
910

1011
use crate::{
11-
sync::{storage_adapter::StorageAdapter, subscriptions::LocallyTrackedSubscription},
12+
sync::{
13+
checkpoint::OwnedBucketChecksum, storage_adapter::StorageAdapter,
14+
subscriptions::LocallyTrackedSubscription,
15+
},
1216
util::JsonString,
1317
};
1418

@@ -37,7 +41,7 @@ pub struct DownloadSyncStatus {
3741
/// When a download is active (that is, a `checkpoint` or `checkpoint_diff` line has been
3842
/// received), information about how far the download has progressed.
3943
pub downloading: Option<SyncDownloadProgress>,
40-
pub streams: Vec<ActiveStreamSubscription>,
44+
pub streams: Option<Vec<ActiveStreamSubscription>>,
4145
}
4246

4347
impl DownloadSyncStatus {
@@ -75,7 +79,7 @@ impl DownloadSyncStatus {
7579
self.mark_connected();
7680

7781
self.downloading = Some(progress);
78-
self.streams = subscriptions;
82+
self.streams = Some(subscriptions);
7983
}
8084

8185
/// Increments [SyncDownloadProgress] progress for the given [DataLine].
@@ -119,7 +123,7 @@ impl Default for DownloadSyncStatus {
119123
connecting: false,
120124
downloading: None,
121125
priority_status: Vec::new(),
122-
streams: Vec::new(),
126+
streams: None,
123127
}
124128
}
125129
}
@@ -137,6 +141,10 @@ impl SyncStatusContainer {
137141
}
138142
}
139143

144+
pub fn inner(&self) -> &Rc<RefCell<DownloadSyncStatus>> {
145+
&self.status
146+
}
147+
140148
/// Invokes a function to update the sync status, then emits an [Instruction::UpdateSyncStatus]
141149
/// if the function did indeed change the status.
142150
pub fn update<F: FnOnce(&mut DownloadSyncStatus) -> ()>(
@@ -262,9 +270,12 @@ impl SyncDownloadProgress {
262270

263271
#[derive(Serialize, Hash)]
264272
pub struct ActiveStreamSubscription {
273+
#[serde(skip)]
274+
pub id: i64,
265275
pub name: String,
266276
pub parameters: Option<Box<JsonString>>,
267277
pub associated_buckets: Vec<String>,
278+
pub priority: Option<BucketPriority>,
268279
pub active: bool,
269280
pub is_default: bool,
270281
pub expires_at: Option<Timestamp>,
@@ -274,13 +285,30 @@ pub struct ActiveStreamSubscription {
274285
impl ActiveStreamSubscription {
275286
pub fn from_local(local: &LocallyTrackedSubscription) -> Self {
276287
Self {
288+
id: local.id,
277289
name: local.stream_name.clone(),
278290
parameters: local.local_params.clone(),
279291
is_default: local.is_default,
292+
priority: None,
280293
associated_buckets: Vec::new(),
281294
active: local.active,
282295
expires_at: local.expires_at.clone().map(|e| Timestamp(e)),
283296
last_synced_at: local.last_synced_at.map(|e| Timestamp(e)),
284297
}
285298
}
299+
300+
pub fn mark_associated_with_bucket(&mut self, bucket: &OwnedBucketChecksum) {
301+
self.associated_buckets.push(bucket.bucket.clone());
302+
self.priority = Some(match self.priority {
303+
None => bucket.priority,
304+
Some(prio) => min(prio, bucket.priority),
305+
});
306+
}
307+
308+
pub fn is_in_priority(&self, prio: Option<BucketPriority>) -> bool {
309+
match prio {
310+
None => true,
311+
Some(prio) => self.priority >= Some(prio),
312+
}
313+
}
286314
}

dart/test/sync_stream_test.dart

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ void main() {
5858
}
5959

6060
group('default streams', () {
61-
test('are created on-demand', () {
61+
syncTest('are created on-demand', (_) {
6262
control('start', null);
6363
control(
6464
'line_text',
@@ -86,11 +86,28 @@ void main() {
8686
'active': true,
8787
'is_default': true,
8888
'expires_at': null,
89-
'last_synced_at': null
89+
'last_synced_at': null,
90+
'priority': 1,
9091
}
9192
],
9293
),
9394
);
95+
96+
control(
97+
'line_text',
98+
json.encode(checkpointComplete(priority: 1)),
99+
);
100+
101+
expect(
102+
lastStatus,
103+
containsPair(
104+
'streams',
105+
[containsPair('last_synced_at', 1740823200)],
106+
),
107+
);
108+
109+
final [stored] = db.select('SELECT * FROM ps_stream_subscriptions');
110+
expect(stored, containsPair('last_synced_at', 1740823200));
94111
});
95112
});
96113
}

dart/test/sync_test.dart

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,7 @@ void _syncTests<T>({
126126
}
127127

128128
List<Object?> pushCheckpointComplete({int? priority, String lastOpId = '1'}) {
129-
return syncLine({
130-
priority == null ? 'checkpoint_complete' : 'partial_checkpoint_complete':
131-
{
132-
'last_op_id': lastOpId,
133-
if (priority != null) 'priority': priority,
134-
},
135-
});
129+
return syncLine(checkpointComplete(priority: priority, lastOpId: lastOpId));
136130
}
137131

138132
ResultSet fetchRows() {

dart/test/utils/test_utils.dart

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ Object checkpoint({
2121
};
2222
}
2323

24+
/// Creates a `checkpoint_complete` or `partial_checkpoint_complete` line.
25+
Object checkpointComplete({int? priority, String lastOpId = '1'}) {
26+
return {
27+
priority == null ? 'checkpoint_complete' : 'partial_checkpoint_complete': {
28+
'last_op_id': lastOpId,
29+
if (priority != null) 'priority': priority,
30+
},
31+
};
32+
}
33+
2434
Object bucketDescription(
2535
String name, {
2636
int checksum = 0,

0 commit comments

Comments
 (0)