Skip to content

Sync local: Refactor to avoid unwrap() #110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 15, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 84 additions & 56 deletions crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,13 @@ impl<'a> SyncOperation<'a> {
let statement = self.collect_full_operations()?;

// We cache the last insert and delete statements for each row
let mut last_insert_table: Option<String> = None;
let mut last_insert_statement: Option<ManagedStmt> = None;
struct CachedStatement {
table: String,
statement: ManagedStmt,
}

let mut last_delete_table: Option<String> = None;
let mut last_delete_statement: Option<ManagedStmt> = None;
let mut last_insert = None::<CachedStatement>;
let mut last_delete = None::<CachedStatement>;

let mut untyped_delete_statement: Option<ManagedStmt> = None;
let mut untyped_insert_statement: Option<ManagedStmt> = None;
Expand Down Expand Up @@ -173,35 +175,50 @@ impl<'a> SyncOperation<'a> {
// NULL data means no PUT operations found, so we delete the row.
if data.is_err() {
// DELETE
if last_delete_table.as_deref() != Some(&quoted) {
// Prepare statement when the table changed
last_delete_statement = Some(
self.db
let delete_statement = match &last_delete {
Some(stmt) if &*stmt.table == &*quoted => &stmt.statement,
_ => {
// Prepare statement when the table changed
let statement = self
.db
.prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted))
.into_db_result(self.db)?,
);
last_delete_table = Some(quoted.clone());
}
let delete_statement = last_delete_statement.as_mut().unwrap();
.into_db_result(self.db)?;

&last_delete
.insert(CachedStatement {
table: quoted.clone(),
statement,
})
.statement
}
};

delete_statement.reset()?;
delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?;
delete_statement.exec()?;
} else {
// INSERT/UPDATE
if last_insert_table.as_deref() != Some(&quoted) {
// Prepare statement when the table changed
last_insert_statement = Some(
self.db
let insert_statement = match &last_insert {
Some(stmt) if &*stmt.table == &*quoted => &stmt.statement,
_ => {
// Prepare statement when the table changed
let statement = self
.db
.prepare_v2(&format!(
"REPLACE INTO {}(id, data) VALUES(?, ?)",
quoted
))
.into_db_result(self.db)?,
);
last_insert_table = Some(quoted.clone());
}
let insert_statement = last_insert_statement.as_mut().unwrap();
.into_db_result(self.db)?;

&last_insert
.insert(CachedStatement {
table: quoted.clone(),
statement,
})
.statement
}
};

insert_statement.reset()?;
insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?;
insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?;
Expand All @@ -211,32 +228,38 @@ impl<'a> SyncOperation<'a> {
} else {
if data.is_err() {
// DELETE
if untyped_delete_statement.is_none() {
// Prepare statement on first use
untyped_delete_statement = Some(
self.db
.prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?")
.into_db_result(self.db)?,
);
}
let delete_statement = untyped_delete_statement.as_mut().unwrap();
let delete_statement = match &untyped_delete_statement {
Some(stmt) => stmt,
None => {
// Prepare statement on first use
untyped_delete_statement.insert(
self.db
.prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?")
.into_db_result(self.db)?,
)
}
};

delete_statement.reset()?;
delete_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?;
delete_statement.bind_text(2, id, sqlite::Destructor::STATIC)?;
delete_statement.exec()?;
} else {
// INSERT/UPDATE
if untyped_insert_statement.is_none() {
// Prepare statement on first use
untyped_insert_statement = Some(
self.db
.prepare_v2(
"REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)",
)
.into_db_result(self.db)?,
);
}
let insert_statement = untyped_insert_statement.as_mut().unwrap();
let insert_statement = match &untyped_insert_statement {
Some(stmt) => stmt,
None => {
// Prepare statement on first use
untyped_insert_statement.insert(
self.db
.prepare_v2(
"REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)",
)
.into_db_result(self.db)?,
)
}
};

insert_statement.reset()?;
insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?;
insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?;
Expand Down Expand Up @@ -449,30 +472,35 @@ struct RawTableWithCachedStatements<'a> {
}

impl<'a> RawTableWithCachedStatements<'a> {
fn prepare_lazily<'b>(
db: *mut sqlite::sqlite3,
slot: &'b mut Option<PreparedPendingStatement<'a>>,
def: &'a PendingStatement,
) -> Result<&'b PreparedPendingStatement<'a>, PowerSyncError>
where
'a: 'b,
{
Ok(match slot {
Some(stmt) => stmt,
None => {
let stmt = PreparedPendingStatement::prepare(db, def)?;
slot.insert(stmt)
}
})
}

fn put_statement(
&mut self,
db: *mut sqlite::sqlite3,
) -> Result<&PreparedPendingStatement, PowerSyncError> {
let cache_slot = &mut self.cached_put;
if let None = cache_slot {
let stmt = PreparedPendingStatement::prepare(db, &self.definition.put)?;
*cache_slot = Some(stmt);
}

return Ok(cache_slot.as_ref().unwrap());
Self::prepare_lazily(db, &mut self.cached_put, &self.definition.put)
}

fn delete_statement(
&mut self,
db: *mut sqlite::sqlite3,
) -> Result<&PreparedPendingStatement, PowerSyncError> {
let cache_slot = &mut self.cached_delete;
if let None = cache_slot {
let stmt = PreparedPendingStatement::prepare(db, &self.definition.delete)?;
*cache_slot = Some(stmt);
}

return Ok(cache_slot.as_ref().unwrap());
Self::prepare_lazily(db, &mut self.cached_delete, &self.definition.delete)
}
}

Expand Down
Loading