Skip to content

Commit 81247be

Browse files
authored
Merge pull request #27 from nociza/magic_mount
Storage Macro: Chunk
2 parents ad6832d + 2d5973b commit 81247be

File tree

7 files changed

+321
-3
lines changed

7 files changed

+321
-3
lines changed

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "colink"
3-
version = "0.2.4"
3+
version = "0.2.5"
44
edition = "2021"
55
description = "CoLink Rust SDK"
66
license = "MIT"
@@ -10,6 +10,7 @@ documentation = "https://docs.rs/colink"
1010
repository = "https://github.com/CoLearn-Dev/colink-sdk-rust-dev"
1111

1212
[dependencies]
13+
async-recursion = "1.0.0"
1314
async-trait = "0.1"
1415
base64 = "0.13.0"
1516
chrono = "0.4"
@@ -33,10 +34,11 @@ prost-build = "0.10"
3334
tonic-build = "0.7"
3435

3536
[features]
36-
default = ["extensions", "remote_storage", "variable_transfer", "registry", "policy_module", "instant_server"]
37+
default = ["extensions", "remote_storage", "variable_transfer", "registry", "policy_module", "instant_server", "storage_macro"]
3738
extensions = []
3839
remote_storage = ["extensions"]
3940
variable_transfer = ["extensions", "remote_storage"]
4041
registry = []
4142
policy_module = []
4243
instant_server = []
44+
storage_macro = []

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ CoLink SDK helps both application adnd protocol developers access the functional
99
Add this to your Cargo.toml:
1010
```toml
1111
[dependencies]
12-
colink = "0.2.4"
12+
colink = "0.2.5"
1313
```
1414

1515
## Getting Started
@@ -91,6 +91,9 @@ cargo run --example user_stop_protocol_operator <address> <user_jwt> <instance_i
9191
```
9292
cargo run --example user_wait_task <address> <user_jwt> <target_user_id>
9393
```
94+
```
95+
cargo run --example storage_macro_chunk <address> <user_jwt> <payload_size>
96+
```
9497

9598
### Protocol
9699
```

examples/storage_macro_chunk.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use colink::{decode_jwt_without_validation, CoLink};
2+
use rand::Rng;
3+
use std::env;
4+
5+
const CHUNK_SIZE: usize = 1024 * 1024; // use 1MB chunks
6+
7+
#[tokio::main]
8+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
9+
let args = env::args().skip(1).collect::<Vec<_>>();
10+
let addr = &args[0];
11+
let jwt = &args[1];
12+
let length = if args.len() > 2 {
13+
args[2].parse::<usize>().unwrap()
14+
} else {
15+
5e6 as usize // default to 5 * 10^6 bytes
16+
};
17+
let user_id = decode_jwt_without_validation(jwt).unwrap().user_id;
18+
println!("user_id: {}", user_id);
19+
let cl = CoLink::new(addr, jwt);
20+
let key_name = "storage_macro_demo:$chunk";
21+
let payload = rand::thread_rng()
22+
.sample_iter(&rand::distributions::Standard)
23+
.take(length)
24+
.collect::<Vec<u8>>();
25+
26+
// create
27+
println!("Creating entry...");
28+
let response = cl.create_entry(key_name, &payload.clone()).await?;
29+
println!("created entry at key name: {}", response);
30+
31+
// read
32+
println!("Reading entry...");
33+
let data = cl.read_entry(key_name).await?;
34+
assert_eq!(data, payload);
35+
println!(
36+
"Read payload of {}MB ({} bytes), verified to be same as bytes written",
37+
payload.len() as f32 / CHUNK_SIZE as f32,
38+
payload.len()
39+
);
40+
41+
// update
42+
println!("Updating entry...");
43+
let new_payload = rand::thread_rng()
44+
.sample_iter(&rand::distributions::Standard)
45+
.take(length / 2)
46+
.collect::<Vec<u8>>();
47+
let response = cl.update_entry(key_name, &new_payload.clone()).await?;
48+
println!("updated entry at key name: {}", response);
49+
50+
// read again to verify
51+
println!("Reading entry again...");
52+
let data = cl.read_entry(key_name).await?;
53+
assert_eq!(data, new_payload);
54+
println!(
55+
"Read payload of {}MB ({} bytes), verified to be same as the updated payload bytes",
56+
new_payload.len() as f32 / CHUNK_SIZE as f32,
57+
new_payload.len()
58+
);
59+
60+
// delete
61+
println!("Deleting entry...");
62+
cl.delete_entry(key_name).await?;
63+
Ok(())
64+
}

src/application.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,17 @@ impl CoLink {
159159

160160
pub async fn create_entry(&self, key_name: &str, payload: &[u8]) -> Result<String, Error> {
161161
let mut client = self._grpc_connect(&self.core_addr).await?;
162+
if key_name.contains('$') {
163+
#[cfg(feature = "storage_macro")]
164+
return self._sm_create_entry(key_name, payload).await;
165+
#[cfg(not(feature = "storage_macro"))]
166+
return Err(format!(
167+
"Storage Macro feature not enabled, but found $ symbol in key name: {}",
168+
key_name
169+
)
170+
.into());
171+
}
172+
162173
let request = generate_request(
163174
&self.jwt,
164175
StorageEntry {
@@ -186,6 +197,17 @@ impl CoLink {
186197
}
187198

188199
pub async fn read_entry(&self, key: &str) -> Result<Vec<u8>, Error> {
200+
if key.contains('$') {
201+
#[cfg(feature = "storage_macro")]
202+
return self._sm_read_entry(key).await;
203+
#[cfg(not(feature = "storage_macro"))]
204+
return Err(format!(
205+
"Storage Macro feature not enabled, but found $ symbol in key name: {}",
206+
key
207+
)
208+
.into());
209+
}
210+
189211
let storage_entry = if key.contains("::") {
190212
StorageEntry {
191213
key_path: key.to_string(),
@@ -203,6 +225,17 @@ impl CoLink {
203225

204226
pub async fn update_entry(&self, key_name: &str, payload: &[u8]) -> Result<String, Error> {
205227
let mut client = self._grpc_connect(&self.core_addr).await?;
228+
if key_name.contains('$') {
229+
#[cfg(feature = "storage_macro")]
230+
return self._sm_update_entry(key_name, payload).await;
231+
#[cfg(not(feature = "storage_macro"))]
232+
return Err(format!(
233+
"Storage Macro feature not enabled, but found $ symbol in key name: {}",
234+
key_name
235+
)
236+
.into());
237+
}
238+
206239
let request = generate_request(
207240
&self.jwt,
208241
StorageEntry {
@@ -218,6 +251,17 @@ impl CoLink {
218251

219252
pub async fn delete_entry(&self, key_name: &str) -> Result<String, Error> {
220253
let mut client = self._grpc_connect(&self.core_addr).await?;
254+
if key_name.contains('$') {
255+
#[cfg(feature = "storage_macro")]
256+
return self._sm_delete_entry(key_name).await;
257+
#[cfg(not(feature = "storage_macro"))]
258+
return Err(format!(
259+
"Storage Macro feature not enabled, but found $ symbol in key name: {}",
260+
key_name
261+
)
262+
.into());
263+
}
264+
221265
let request = generate_request(
222266
&self.jwt,
223267
StorageEntry {

src/extensions.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ mod read_or_wait;
1212
pub mod registry;
1313
#[cfg(feature = "remote_storage")]
1414
mod remote_storage;
15+
#[cfg(feature = "storage_macro")]
16+
mod storage_macro;
1517
#[cfg(feature = "extensions")]
1618
mod switch_to_generated_user;
1719
#[cfg(feature = "variable_transfer")]

src/extensions/storage_macro.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
mod chunk;
2+
3+
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
4+
5+
impl crate::application::CoLink {
6+
fn _parse_macro(&self, key_name: &str) -> (String, String, String) {
7+
let split_key = key_name.split(':').collect::<Vec<&str>>();
8+
let mut macro_type = String::new();
9+
for s in split_key.iter().rev() {
10+
if s.contains('$') {
11+
macro_type = s.to_string();
12+
break;
13+
}
14+
}
15+
let macro_type_splitter = format!(":{}", macro_type);
16+
let split_by_macro = key_name.split(&macro_type_splitter).collect::<Vec<&str>>();
17+
let mut string_after = split_by_macro[split_by_macro.len() - 1].to_string();
18+
if string_after.starts_with(':') {
19+
string_after = string_after[1..].to_string();
20+
}
21+
(
22+
split_by_macro[0..(split_by_macro.len() - 1)].join(&macro_type_splitter),
23+
macro_type.replace('$', ""),
24+
string_after,
25+
)
26+
}
27+
28+
pub(crate) async fn _sm_create_entry(
29+
&self,
30+
key_name: &str,
31+
payload: &[u8],
32+
) -> Result<String, Error> {
33+
let (string_before, macro_type, _) = self._parse_macro(key_name);
34+
match macro_type.as_str() {
35+
"chunk" => {
36+
self._create_entry_chunk(string_before.as_str(), payload)
37+
.await
38+
}
39+
_ => Err(format!(
40+
"invalid storage macro, found {} in key name {}",
41+
macro_type, key_name
42+
)
43+
.into()),
44+
}
45+
}
46+
47+
pub(crate) async fn _sm_read_entry(&self, key_name: &str) -> Result<Vec<u8>, Error> {
48+
let (string_before, macro_type, _) = self._parse_macro(key_name);
49+
match macro_type.as_str() {
50+
"chunk" => self._read_entry_chunk(string_before.as_str()).await,
51+
_ => Err(format!(
52+
"invalid storage macro, found {} in key name {}",
53+
macro_type, key_name
54+
)
55+
.into()),
56+
}
57+
}
58+
59+
pub(crate) async fn _sm_update_entry(
60+
&self,
61+
key_name: &str,
62+
payload: &[u8],
63+
) -> Result<String, Error> {
64+
let (string_before, macro_type, _) = self._parse_macro(key_name);
65+
match macro_type.as_str() {
66+
"chunk" => {
67+
self._update_entry_chunk(string_before.as_str(), payload)
68+
.await
69+
}
70+
_ => Err(format!(
71+
"invalid storage macro, found {} in key name {}",
72+
macro_type, key_name
73+
)
74+
.into()),
75+
}
76+
}
77+
78+
pub(crate) async fn _sm_delete_entry(&self, key_name: &str) -> Result<String, Error> {
79+
let (string_before, macro_type, _) = self._parse_macro(key_name);
80+
match macro_type.as_str() {
81+
"chunk" => self._delete_entry_chunk(string_before.as_str()).await,
82+
_ => Err(format!(
83+
"invalid storage macro, found {} in key name {}",
84+
macro_type, key_name
85+
)
86+
.into()),
87+
}
88+
}
89+
}

src/extensions/storage_macro/chunk.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
use crate::decode_jwt_without_validation;
2+
use async_recursion::async_recursion;
3+
4+
const CHUNK_SIZE: usize = 1024 * 1024; // use 1MB chunks
5+
6+
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
7+
8+
impl crate::application::CoLink {
9+
#[async_recursion]
10+
async fn _store_chunks(&self, payload: &[u8], key_name: &str) -> Result<Vec<String>, Error> {
11+
let mut offset = 0;
12+
let mut chunk_id = 0;
13+
let mut chunk_paths = Vec::new();
14+
while offset < payload.len() {
15+
let chunk_size = if offset + CHUNK_SIZE > payload.len() {
16+
payload.len() - offset
17+
} else {
18+
CHUNK_SIZE
19+
};
20+
let response = self
21+
.update_entry(
22+
&format!("{}:{}", key_name, chunk_id),
23+
&payload[offset..offset + chunk_size],
24+
)
25+
.await?;
26+
chunk_paths.push(response.split('@').last().unwrap().to_string()); // only store the timestamps
27+
offset += chunk_size;
28+
chunk_id += 1;
29+
}
30+
Ok(chunk_paths)
31+
}
32+
33+
fn _check_chunk_paths_size(&self, chunk_paths: Vec<String>) -> Result<String, Error> {
34+
let chunk_paths_string = chunk_paths.join(";");
35+
if chunk_paths_string.len() > CHUNK_SIZE {
36+
return Err(format!(
37+
"File too large: failed to store {} chunks references in metadata",
38+
chunk_paths.len()
39+
)
40+
.into());
41+
}
42+
Ok(chunk_paths_string)
43+
}
44+
45+
#[async_recursion]
46+
pub(crate) async fn _create_entry_chunk(
47+
&self,
48+
key_name: &str,
49+
payload: &[u8],
50+
) -> Result<String, Error> {
51+
let metadata_key = format!("{}:chunk_metadata", key_name);
52+
// lock the metadata entry to prevent simultaneous writes
53+
let lock_token = self.lock(&metadata_key.clone()).await?;
54+
// create the chunks and store them
55+
let chunk_paths = self._store_chunks(payload, key_name).await?;
56+
// make sure that the chunk paths are smaller than the maximum entry size
57+
let chunk_paths_string = self._check_chunk_paths_size(chunk_paths)?;
58+
// store the chunk paths in the metadata entry and update metadata
59+
let response = self
60+
.create_entry(&metadata_key.clone(), &chunk_paths_string.into_bytes())
61+
.await?;
62+
self.unlock(lock_token).await?;
63+
Ok(response)
64+
}
65+
66+
#[async_recursion]
67+
pub(crate) async fn _read_entry_chunk(&self, key_name: &str) -> Result<Vec<u8>, Error> {
68+
let metadata_key = format!("{}:chunk_metadata", key_name);
69+
let metadata_response = self.read_entry(&metadata_key.clone()).await?;
70+
let payload_string = String::from_utf8(metadata_response.clone())?;
71+
let user_id = decode_jwt_without_validation(&self.jwt).unwrap().user_id;
72+
73+
// read the chunks into a single vector
74+
let chunks_paths = payload_string.split(';').collect::<Vec<&str>>();
75+
let mut payload = Vec::new();
76+
for (i, timestamp) in chunks_paths.iter().enumerate() {
77+
let response = self
78+
.read_entry(&format!("{}::{}:{}@{}", user_id, key_name, i, timestamp))
79+
.await?;
80+
payload.append(&mut response.clone());
81+
}
82+
Ok(payload)
83+
}
84+
85+
#[async_recursion]
86+
pub(crate) async fn _update_entry_chunk(
87+
&self,
88+
key_name: &str,
89+
payload: &[u8],
90+
) -> Result<String, Error> {
91+
let metadata_key = format!("{}:chunk_metadata", key_name);
92+
// lock the metadata entry to prevent simultaneous writes
93+
let lock_token = self.lock(&metadata_key.clone()).await?;
94+
// split payload into chunks and update the chunks
95+
let chunk_paths = self._store_chunks(payload, key_name).await?;
96+
// make sure that the chunk paths are smaller than the maximum entry size
97+
let chunk_paths_string = self._check_chunk_paths_size(chunk_paths)?;
98+
// update the metadata entry
99+
let response = self
100+
.update_entry(&metadata_key.clone(), &chunk_paths_string.into_bytes())
101+
.await?;
102+
self.unlock(lock_token).await?;
103+
Ok(response)
104+
}
105+
106+
#[async_recursion]
107+
pub(crate) async fn _delete_entry_chunk(&self, key_name: &str) -> Result<String, Error> {
108+
let metadata_key = format!("{}:chunk_metadata", key_name);
109+
let lock_token = self.lock(&metadata_key.clone()).await?;
110+
let response = self.delete_entry(&metadata_key.clone()).await?;
111+
self.unlock(lock_token).await?;
112+
Ok(response)
113+
}
114+
}

0 commit comments

Comments
 (0)