Skip to content

Commit f55201e

Browse files
authored
Merge pull request #59 from CoLearn-Dev/sm_chunk
Storage macro: chunk compatibility mode, read_keys Bugfix: lock in vt_inbox
2 parents a7a3a94 + 0041e1c commit f55201e

File tree

9 files changed

+377
-16
lines changed

9 files changed

+377
-16
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "colink"
3-
version = "0.3.5"
3+
version = "0.3.6"
44
edition = "2021"
55
description = "CoLink Rust SDK"
66
license = "MIT"

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ CoLink SDK helps both application and protocol developers access the functionali
99
Add this to your Cargo.toml:
1010
```toml
1111
[dependencies]
12-
colink = "0.3.5"
12+
colink = "0.3.6"
1313
```
1414

1515
## Getting Started

src/application.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ impl CoLink {
203203
}
204204

205205
pub async fn create_entry(&self, key_name: &str, payload: &[u8]) -> Result<String, Error> {
206-
let mut client = self._grpc_connect(&self.core_addr).await?;
207206
if key_name.contains('$') {
208207
#[cfg(feature = "storage_macro")]
209208
return self._sm_create_entry(key_name, payload).await;
@@ -215,6 +214,7 @@ impl CoLink {
215214
.into());
216215
}
217216

217+
let mut client = self._grpc_connect(&self.core_addr).await?;
218218
let request = generate_request(
219219
&self.jwt,
220220
StorageEntry {
@@ -269,7 +269,6 @@ impl CoLink {
269269
}
270270

271271
pub async fn update_entry(&self, key_name: &str, payload: &[u8]) -> Result<String, Error> {
272-
let mut client = self._grpc_connect(&self.core_addr).await?;
273272
if key_name.contains('$') {
274273
#[cfg(feature = "storage_macro")]
275274
return self._sm_update_entry(key_name, payload).await;
@@ -281,6 +280,7 @@ impl CoLink {
281280
.into());
282281
}
283282

283+
let mut client = self._grpc_connect(&self.core_addr).await?;
284284
let request = generate_request(
285285
&self.jwt,
286286
StorageEntry {
@@ -295,7 +295,6 @@ impl CoLink {
295295
}
296296

297297
pub async fn delete_entry(&self, key_name: &str) -> Result<String, Error> {
298-
let mut client = self._grpc_connect(&self.core_addr).await?;
299298
if key_name.contains('$') {
300299
#[cfg(feature = "storage_macro")]
301300
return self._sm_delete_entry(key_name).await;
@@ -307,6 +306,7 @@ impl CoLink {
307306
.into());
308307
}
309308

309+
let mut client = self._grpc_connect(&self.core_addr).await?;
310310
let request = generate_request(
311311
&self.jwt,
312312
StorageEntry {
@@ -324,6 +324,17 @@ impl CoLink {
324324
prefix: &str,
325325
include_history: bool,
326326
) -> Result<Vec<StorageEntry>, Error> {
327+
if prefix.contains('$') {
328+
#[cfg(feature = "storage_macro")]
329+
return self._sm_read_keys(prefix, include_history).await;
330+
#[cfg(not(feature = "storage_macro"))]
331+
return Err(format!(
332+
"Storage Macro feature not enabled, but found $ symbol in key name: {}",
333+
key_name
334+
)
335+
.into());
336+
}
337+
327338
let mut client = self._grpc_connect(&self.core_addr).await?;
328339
let request = generate_request(
329340
&self.jwt,

src/extensions/storage_macro.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod append;
22
mod chunk;
33
mod fs;
44
mod redis;
5+
use crate::StorageEntry;
56

67
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
78

@@ -53,6 +54,12 @@ impl crate::application::CoLink {
5354
}
5455

5556
pub(crate) async fn _sm_read_entry(&self, key_name: &str) -> Result<Vec<u8>, Error> {
57+
let key_name = if key_name.contains("::") {
58+
&key_name
59+
[key_name.find(':').unwrap() + 2..key_name.rfind('@').unwrap_or(key_name.len())]
60+
} else {
61+
key_name
62+
};
5663
let (string_before, macro_type, string_after) = self._parse_macro(key_name);
5764
match macro_type.as_str() {
5865
"chunk" => self._read_entry_chunk(&string_before).await,
@@ -107,4 +114,39 @@ impl crate::application::CoLink {
107114
.into()),
108115
}
109116
}
117+
118+
pub(crate) async fn _sm_read_keys(
119+
&self,
120+
prefix: &str,
121+
include_history: bool,
122+
) -> Result<Vec<StorageEntry>, Error> {
123+
if include_history {
124+
return Err("Storage Macro: include_history is not supported.".into());
125+
}
126+
if !prefix.starts_with(&format!("{}::", self.get_user_id()?)) {
127+
return Err("prefix must start with the given user_id".into());
128+
}
129+
let key_name_prefix = &prefix[prefix.find(':').unwrap() + 2..];
130+
let (string_before, macro_type, string_after) = self._parse_macro(key_name_prefix);
131+
let key_list = match macro_type.as_str() {
132+
"redis" => self._read_keys_redis(&string_before, &string_after).await?,
133+
"fs" => self._read_keys_fs(&string_before, &string_after).await?,
134+
_ => {
135+
return Err(format!(
136+
"invalid storage macro, found {} in prefix {}",
137+
macro_type, key_name_prefix
138+
)
139+
.into())
140+
}
141+
};
142+
let mut res: Vec<StorageEntry> = Vec::new();
143+
for key in key_list {
144+
res.push(StorageEntry {
145+
key_name: Default::default(),
146+
key_path: format!("{}:{}@0", prefix, key),
147+
payload: Default::default(),
148+
});
149+
}
150+
Ok(res)
151+
}
110152
}

src/extensions/storage_macro/chunk.rs

Lines changed: 159 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,122 @@ impl crate::application::CoLink {
9797
Ok(chunk_paths_string)
9898
}
9999

100+
async fn _store_chunks_compatibility_mode(
101+
&self,
102+
payload: &[u8],
103+
key_name: &str,
104+
) -> Result<i32, Error> {
105+
let mut offset = 0;
106+
let mut chunk_id = 0;
107+
while offset < payload.len() {
108+
let chunk_size = if offset + CHUNK_SIZE > payload.len() {
109+
payload.len() - offset
110+
} else {
111+
CHUNK_SIZE
112+
};
113+
self.update_entry(
114+
&format!("{}:{}", key_name, chunk_id),
115+
&payload[offset..offset + chunk_size],
116+
)
117+
.await?;
118+
offset += chunk_size;
119+
chunk_id += 1;
120+
}
121+
Ok(chunk_id)
122+
}
123+
124+
async fn _append_chunks_compatibility_mode(
125+
&self,
126+
chunk_len: i32,
127+
payload: &[u8],
128+
key_name: &str,
129+
) -> Result<i32, Error> {
130+
let last_chunk_id = chunk_len - 1;
131+
let mut last_chunk = self
132+
.read_entry(&format!("{}:{}", key_name, last_chunk_id,))
133+
.await?;
134+
let mut offset = 0;
135+
let mut chunk_id = chunk_len;
136+
if last_chunk.len() < CHUNK_SIZE {
137+
let chunk_size = if payload.len() < CHUNK_SIZE - last_chunk.len() {
138+
payload.len()
139+
} else {
140+
CHUNK_SIZE - last_chunk.len()
141+
};
142+
last_chunk.append(&mut payload[..chunk_size].to_vec());
143+
self.update_entry(&format!("{}:{}", key_name, last_chunk_id), &last_chunk)
144+
.await?;
145+
offset = chunk_size;
146+
}
147+
while offset < payload.len() {
148+
let chunk_size = if offset + CHUNK_SIZE > payload.len() {
149+
payload.len() - offset
150+
} else {
151+
CHUNK_SIZE
152+
};
153+
self.update_entry(
154+
&format!("{}:{}", key_name, chunk_id),
155+
&payload[offset..offset + chunk_size],
156+
)
157+
.await?;
158+
offset += chunk_size;
159+
chunk_id += 1;
160+
}
161+
Ok(chunk_id)
162+
}
163+
164+
async fn _delete_chunks_compatibility_mode(&self, key_name: &str) -> Result<String, Error> {
165+
let metadata_key = format!("{}:chunk_metadata", key_name);
166+
let chunk_len = self.read_entry(&metadata_key).await?;
167+
let chunk_len = String::from_utf8_lossy(&chunk_len).parse::<i32>()?;
168+
let res = self.delete_entry(&metadata_key).await?;
169+
for i in 0..chunk_len {
170+
self.delete_entry(&format!("{}:{}", key_name, i)).await?;
171+
}
172+
Ok(res)
173+
}
174+
175+
async fn _chunk_lock_compatibility_mode(&self, key_name: &str) -> Result<(), Error> {
176+
loop {
177+
if self
178+
.create_entry(&format!("{}:chunk_lock", key_name), b"")
179+
.await
180+
.is_ok()
181+
{
182+
return Ok(());
183+
}
184+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
185+
}
186+
}
187+
188+
async fn _chunk_unlock_compatibility_mode(&self, key_name: &str) -> Result<(), Error> {
189+
self.delete_entry(&format!("{}:chunk_lock", key_name))
190+
.await?;
191+
Ok(())
192+
}
193+
100194
#[async_recursion]
101195
pub(crate) async fn _create_entry_chunk(
102196
&self,
103197
key_name: &str,
104198
payload: &[u8],
105199
) -> Result<String, Error> {
106200
let metadata_key = format!("{}:chunk_metadata", key_name);
201+
if key_name.contains('$') {
202+
self._chunk_lock_compatibility_mode(key_name).await?;
203+
if let Err(e) = self.create_entry(&metadata_key, b"0").await {
204+
self._chunk_unlock_compatibility_mode(key_name).await?;
205+
return Err(e);
206+
}
207+
let chunk_len = self
208+
._store_chunks_compatibility_mode(payload, key_name)
209+
.await?;
210+
let res = self
211+
.update_entry(&metadata_key, chunk_len.to_string().as_bytes())
212+
.await?;
213+
self._chunk_unlock_compatibility_mode(key_name).await?;
214+
return Ok(res);
215+
}
107216
// lock the metadata entry to prevent simultaneous writes
108217
let lock_token = self.lock(&metadata_key).await?;
109218
// use a closure to prevent locking forever caused by errors
@@ -114,7 +223,7 @@ impl crate::application::CoLink {
114223
let chunk_paths_string = self._check_chunk_paths_size(chunk_paths)?;
115224
// store the chunk paths in the metadata entry and update metadata
116225
let response = self
117-
.create_entry(&metadata_key, &chunk_paths_string.into_bytes())
226+
.create_entry(&metadata_key, chunk_paths_string.as_bytes())
118227
.await?;
119228
Ok::<String, Error>(response)
120229
}
@@ -126,6 +235,22 @@ impl crate::application::CoLink {
126235
#[async_recursion]
127236
pub(crate) async fn _read_entry_chunk(&self, key_name: &str) -> Result<Vec<u8>, Error> {
128237
let metadata_key = format!("{}:chunk_metadata", key_name);
238+
if key_name.contains('$') {
239+
self._chunk_lock_compatibility_mode(key_name).await?;
240+
let res = async {
241+
let chunk_len = self.read_entry(&metadata_key).await?;
242+
let chunk_len = String::from_utf8_lossy(&chunk_len).parse::<i32>()?;
243+
let mut payload = Vec::new();
244+
for i in 0..chunk_len {
245+
let mut res = self.read_entry(&format!("{}:{}", key_name, i)).await?;
246+
payload.append(&mut res);
247+
}
248+
Ok::<Vec<u8>, Error>(payload)
249+
}
250+
.await;
251+
self._chunk_unlock_compatibility_mode(key_name).await?;
252+
return res;
253+
}
129254
let metadata_response = self.read_entry(&metadata_key).await?;
130255
let payload_string = String::from_utf8(metadata_response)?;
131256
let user_id = self.get_user_id()?;
@@ -149,6 +274,18 @@ impl crate::application::CoLink {
149274
payload: &[u8],
150275
) -> Result<String, Error> {
151276
let metadata_key = format!("{}:chunk_metadata", key_name);
277+
if key_name.contains('$') {
278+
self._chunk_lock_compatibility_mode(key_name).await?;
279+
let _ = self._delete_chunks_compatibility_mode(key_name).await;
280+
let chunk_len = self
281+
._store_chunks_compatibility_mode(payload, key_name)
282+
.await?;
283+
let res = self
284+
.update_entry(&metadata_key, chunk_len.to_string().as_bytes())
285+
.await?;
286+
self._chunk_unlock_compatibility_mode(key_name).await?;
287+
return Ok(res);
288+
}
152289
// lock the metadata entry to prevent simultaneous writes
153290
let lock_token = self.lock(&metadata_key).await?;
154291
// use a closure to prevent locking forever caused by errors
@@ -159,7 +296,7 @@ impl crate::application::CoLink {
159296
let chunk_paths_string = self._check_chunk_paths_size(chunk_paths)?;
160297
// update the metadata entry
161298
let response = self
162-
.update_entry(&metadata_key, &chunk_paths_string.into_bytes())
299+
.update_entry(&metadata_key, chunk_paths_string.as_bytes())
163300
.await?;
164301
Ok::<String, Error>(response)
165302
}
@@ -175,6 +312,19 @@ impl crate::application::CoLink {
175312
payload: &[u8],
176313
) -> Result<String, Error> {
177314
let metadata_key = format!("{}:chunk_metadata", key_name);
315+
if key_name.contains('$') {
316+
self._chunk_lock_compatibility_mode(key_name).await?;
317+
let chunk_len = self.read_entry(&metadata_key).await?;
318+
let chunk_len = String::from_utf8_lossy(&chunk_len).parse::<i32>()?;
319+
let new_chunk_len = self
320+
._append_chunks_compatibility_mode(chunk_len, payload, key_name)
321+
.await?;
322+
let res = self
323+
.update_entry(&metadata_key, new_chunk_len.to_string().as_bytes())
324+
.await?;
325+
self._chunk_unlock_compatibility_mode(key_name).await?;
326+
return Ok(res);
327+
}
178328
// lock the metadata entry to prevent simultaneous writes
179329
let lock_token = self.lock(&metadata_key).await?;
180330
// use a closure to prevent locking forever caused by errors
@@ -189,7 +339,7 @@ impl crate::application::CoLink {
189339
let chunk_paths_string = self._check_chunk_paths_size(chunk_paths)?;
190340
// update the metadata entry
191341
let response = self
192-
.update_entry(&metadata_key, &chunk_paths_string.into_bytes())
342+
.update_entry(&metadata_key, chunk_paths_string.as_bytes())
193343
.await?;
194344
Ok::<String, Error>(response)
195345
}
@@ -200,6 +350,12 @@ impl crate::application::CoLink {
200350

201351
#[async_recursion]
202352
pub(crate) async fn _delete_entry_chunk(&self, key_name: &str) -> Result<String, Error> {
353+
if key_name.contains('$') {
354+
self._chunk_lock_compatibility_mode(key_name).await?;
355+
let res = self._delete_chunks_compatibility_mode(key_name).await;
356+
self._chunk_unlock_compatibility_mode(key_name).await?;
357+
return res;
358+
}
203359
let metadata_key = format!("{}:chunk_metadata", key_name);
204360
let lock_token = self.lock(&metadata_key).await?;
205361
let res = self.delete_entry(&metadata_key).await;

src/extensions/storage_macro/fs.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ impl crate::application::CoLink {
1717
path += "/";
1818
path += &path_suffix;
1919
}
20-
println!("path: {}", path);
2120
let path = PathBuf::from(path);
2221
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
2322
Ok(path)
@@ -97,4 +96,19 @@ impl crate::application::CoLink {
9796
tokio::fs::remove_file(path).await?;
9897
Ok("ok".to_string())
9998
}
99+
100+
#[async_recursion]
101+
pub(crate) async fn _read_keys_fs(
102+
&self,
103+
path_key_name: &str,
104+
prefix: &str,
105+
) -> Result<Vec<String>, Error> {
106+
let path = self._sm_fs_get_path(path_key_name, prefix).await?;
107+
let mut key_list: Vec<String> = Vec::new();
108+
let mut dir = tokio::fs::read_dir(path).await?;
109+
while let Some(entry) = dir.next_entry().await? {
110+
key_list.push(entry.file_name().to_string_lossy().to_string());
111+
}
112+
Ok(key_list)
113+
}
100114
}

0 commit comments

Comments
 (0)