Skip to content

Commit 21f4fc7

Browse files
committed
Test handling default streams
1 parent 76de4fa commit 21f4fc7

14 files changed

+219
-71
lines changed

crates/core/src/migrations.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::error::{PSResult, PowerSyncError};
1212
use crate::fix_data::apply_v035_fix;
1313
use crate::sync::BucketPriority;
1414

15-
pub const LATEST_VERSION: i32 = 10;
15+
pub const LATEST_VERSION: i32 = 11;
1616

1717
pub fn powersync_migrate(
1818
ctx: *mut sqlite::context,
@@ -387,22 +387,23 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array(
387387
if current_version < 11 && target_version >= 11 {
388388
let stmt = "\
389389
CREATE TABLE ps_stream_subscriptions (
390-
id NOT NULL INTEGER PRIMARY KEY,
390+
id INTEGER NOT NULL PRIMARY KEY,
391391
stream_name TEXT NOT NULL,
392392
active INTEGER NOT NULL DEFAULT FALSE,
393393
is_default INTEGER NOT NULL DEFAULT FALSE,
394394
local_priority INTEGER,
395395
local_params TEXT,
396396
ttl INTEGER,
397-
expires_at INTEGER
397+
expires_at INTEGER,
398+
last_synced_at INTEGER
398399
) STRICT;
399400
400401
INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(
401402
json_object('sql', 'todo down migration'),
402403
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11')
403404
));
404405
";
405-
local_db.exec_safe(stmt)?;
406+
local_db.exec_safe(stmt).into_db_result(local_db)?;
406407
}
407408

408409
Ok(())

crates/core/src/sync/storage_adapter.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ impl StorageAdapter {
282282
})?,
283283
ttl: column_nullable(&stmt, 6, || Ok(stmt.column_int64(6)))?,
284284
expires_at: column_nullable(&stmt, 7, || Ok(stmt.column_int64(7)))?,
285+
last_synced_at: column_nullable(&stmt, 8, || Ok(stmt.column_int64(8)))?,
285286
})
286287
}
287288

@@ -296,8 +297,6 @@ impl StorageAdapter {
296297
while stmt.step()? == ResultCode::ROW {
297298
action(Self::read_stream_subscription(&stmt)?);
298299
}
299-
300-
stmt.finalize()?;
301300
Ok(())
302301
}
303302

crates/core/src/sync/streaming_sync.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ impl StreamingSyncIteration {
594594
if subscription.is_default {
595595
let found = tracked_subscriptions
596596
.iter()
597-
.filter(|s| s.stream_name == subscription.name)
597+
.filter(|s| s.stream_name == subscription.name && s.local_params.is_none())
598598
.next();
599599

600600
if found.is_none() {
@@ -606,10 +606,20 @@ impl StreamingSyncIteration {
606606

607607
debug_assert!(tracked_subscriptions.is_sorted_by_key(|s| s.id));
608608

609-
let mut resolved: Vec<ActiveStreamSubscription> = tracked_subscriptions
610-
.iter()
611-
.map(|e| ActiveStreamSubscription::from_local(e))
612-
.collect();
609+
let mut resolved: Vec<ActiveStreamSubscription> =
610+
Vec::with_capacity(tracked_subscriptions.len());
611+
// Map of stream name to index in resolved for stream subscriptions without custom
612+
// parameters. This simplifies the association from BucketSubscriptionReason::IsDefault to
613+
// stream subscriptions later.
614+
let mut default_stream_subscriptions = BTreeMap::<&str, usize>::new();
615+
616+
for (i, subscription) in tracked_subscriptions.iter().enumerate() {
617+
resolved.push(ActiveStreamSubscription::from_local(subscription));
618+
619+
if subscription.local_params.is_none() {
620+
default_stream_subscriptions.insert(&subscription.stream_name, i);
621+
}
622+
}
613623

614624
// TODO: Cleanup old default subscriptions?
615625

@@ -627,7 +637,13 @@ impl StreamingSyncIteration {
627637
}
628638
}
629639
}
630-
BucketSubscriptionReason::IsDefault { stream_name } => todo!(),
640+
BucketSubscriptionReason::IsDefault { stream_name } => {
641+
if let Some(index) = default_stream_subscriptions.get(stream_name.as_str()) {
642+
resolved[*index]
643+
.associated_buckets
644+
.push(bucket.bucket.clone());
645+
}
646+
}
631647
BucketSubscriptionReason::Unknown => {}
632648
}
633649
}

crates/core/src/sync/subscriptions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub struct LocallyTrackedSubscription {
2323
pub local_params: Option<Box<JsonString>>,
2424
pub ttl: Option<i64>,
2525
pub expires_at: Option<i64>,
26+
pub last_synced_at: Option<i64>,
2627
}
2728

2829
impl LocallyTrackedSubscription {

crates/core/src/sync/sync_status.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,6 @@ pub struct ActiveStreamSubscription {
268268
pub active: bool,
269269
pub is_default: bool,
270270
pub expires_at: Option<Timestamp>,
271-
pub has_synced: bool,
272271
pub last_synced_at: Option<Timestamp>,
273272
}
274273

@@ -281,8 +280,7 @@ impl ActiveStreamSubscription {
281280
associated_buckets: Vec::new(),
282281
active: local.active,
283282
expires_at: local.expires_at.clone().map(|e| Timestamp(e)),
284-
has_synced: false, // TODDO
285-
last_synced_at: None, // TODO
283+
last_synced_at: local.last_synced_at.map(|e| Timestamp(e)),
286284
}
287285
}
288286
}

crates/core/src/util.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ pub fn quote_identifier_prefixed(prefix: &str, name: &str) -> String {
4141
return format!("\"{:}{:}\"", prefix, name.replace("\"", "\"\""));
4242
}
4343

44+
/// Calls [read] to read a column if it's not null, otherwise returns [None].
45+
#[inline]
4446
pub fn column_nullable<T, R: FnOnce() -> Result<T, PowerSyncError>>(
4547
stmt: &ManagedStmt,
4648
index: i32,

dart/test/error_test.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import 'dart:convert';
33
import 'package:sqlite3/common.dart';
44
import 'package:test/test.dart';
55

6-
import 'utils/matchers.dart';
76
import 'utils/native_test_utils.dart';
7+
import 'utils/test_utils.dart';
88

99
void main() {
1010
group('error reporting', () {

dart/test/goldens/simple_iteration.json

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
"connected": false,
1010
"connecting": true,
1111
"priority_status": [],
12-
"downloading": null
12+
"downloading": null,
13+
"streams": []
1314
}
1415
}
1516
},
@@ -21,7 +22,11 @@
2122
"raw_data": true,
2223
"binary_data": true,
2324
"client_id": "test-test-test-test",
24-
"parameters": null
25+
"parameters": null,
26+
"streams": {
27+
"include_defaults": true,
28+
"subscriptions": []
29+
}
2530
}
2631
}
2732
}
@@ -59,7 +64,8 @@
5964
"target_count": 1
6065
}
6166
}
62-
}
67+
},
68+
"streams": []
6369
}
6470
}
6571
}
@@ -108,7 +114,8 @@
108114
"target_count": 1
109115
}
110116
}
111-
}
117+
},
118+
"streams": []
112119
}
113120
}
114121
}
@@ -146,7 +153,8 @@
146153
"has_synced": true
147154
}
148155
],
149-
"downloading": null
156+
"downloading": null,
157+
"streams": []
150158
}
151159
}
152160
}

dart/test/goldens/starting_stream.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
"connected": false,
1414
"connecting": true,
1515
"priority_status": [],
16-
"downloading": null
16+
"downloading": null,
17+
"streams": []
1718
}
1819
}
1920
},
@@ -27,6 +28,10 @@
2728
"client_id": "test-test-test-test",
2829
"parameters": {
2930
"foo": "bar"
31+
},
32+
"streams": {
33+
"include_defaults": true,
34+
"subscriptions": []
3035
}
3136
}
3237
}

dart/test/sync_stream_test.dart

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import 'dart:convert';
2+
3+
import 'package:file/local.dart';
4+
import 'package:sqlite3/common.dart';
5+
import 'package:sqlite3/sqlite3.dart';
6+
import 'package:sqlite3_test/sqlite3_test.dart';
7+
import 'package:test/test.dart';
8+
9+
import 'utils/native_test_utils.dart';
10+
import 'utils/test_utils.dart';
11+
12+
void main() {
13+
final vfs = TestSqliteFileSystem(
14+
fs: const LocalFileSystem(), name: 'vfs-stream-test');
15+
16+
setUpAll(() {
17+
loadExtension();
18+
sqlite3.registerVirtualFileSystem(vfs, makeDefault: false);
19+
});
20+
tearDownAll(() => sqlite3.unregisterVirtualFileSystem(vfs));
21+
22+
late CommonDatabase db;
23+
Object? lastStatus;
24+
25+
setUp(() async {
26+
db = openTestDatabase(vfs: vfs)
27+
..select('select powersync_init();')
28+
..select('select powersync_replace_schema(?)', [json.encode(testSchema)])
29+
..execute('update ps_kv set value = ?2 where key = ?1',
30+
['client_id', 'test-test-test-test']);
31+
});
32+
33+
tearDown(() {
34+
db.dispose();
35+
});
36+
37+
List<Object?> control(String operation, Object? data) {
38+
db.execute('begin');
39+
ResultSet result;
40+
41+
try {
42+
result = db.select('SELECT powersync_control(?, ?)', [operation, data]);
43+
} catch (e) {
44+
db.execute('rollback');
45+
rethrow;
46+
}
47+
48+
db.execute('commit');
49+
final [row] = result;
50+
final instructions = jsonDecode(row.columnAt(0)) as List;
51+
for (final instruction in instructions) {
52+
if (instruction case {'UpdateSyncStatus': final status}) {
53+
lastStatus = status['status']!;
54+
}
55+
}
56+
57+
return instructions;
58+
}
59+
60+
group('default streams', () {
61+
test('are created on-demand', () {
62+
control('start', null);
63+
control(
64+
'line_text',
65+
json.encode(
66+
checkpoint(
67+
lastOpId: 1,
68+
buckets: [
69+
bucketDescription('a',
70+
subscriptions: 'my_default_stream', priority: 1),
71+
],
72+
streams: [('my_default_stream', true)],
73+
),
74+
),
75+
);
76+
77+
expect(
78+
lastStatus,
79+
containsPair(
80+
'streams',
81+
[
82+
{
83+
'name': 'my_default_stream',
84+
'parameters': null,
85+
'associated_buckets': ['a'],
86+
'active': true,
87+
'is_default': true,
88+
'expires_at': null,
89+
'last_synced_at': null
90+
}
91+
],
92+
),
93+
);
94+
});
95+
});
96+
}

0 commit comments

Comments
 (0)