Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #15 from MalteJanz/refactoring-for-readability
Browse files Browse the repository at this point in the history
Some refactoring for readability
  • Loading branch information
MalteJanz committed Jul 4, 2024
2 parents 179d0f5 + c2b4dc3 commit 7374013
Show file tree
Hide file tree
Showing 11 changed files with 380 additions and 318 deletions.
2 changes: 2 additions & 0 deletions src/api/filter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Data structures to build criteria objects for the shopware API

use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

Expand Down
8 changes: 4 additions & 4 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! Everything needed for communicating with the Shopware API

pub mod filter;

use crate::api::filter::{Criteria, CriteriaFilter};
use crate::config::Credentials;
use crate::config_file::Credentials;
use anyhow::anyhow;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{header, Client, Response, StatusCode};
Expand Down Expand Up @@ -98,9 +100,7 @@ impl SwClient {
Ok(())
}

pub async fn entity_schema(
&self,
) -> Result<serde_json::Map<String, serde_json::Value>, SwApiError> {
pub async fn entity_schema(&self) -> Result<Entity, SwApiError> {
// ToDo: implement retry on auth fail
let access_token = self.access_token.lock().unwrap().clone();
let response = {
Expand Down
64 changes: 64 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! Definitions for the CLI commands, arguments and help texts
//!
//! Makes heavy use of https://docs.rs/clap/latest/clap/

use clap::{Parser, Subcommand};
use std::path::PathBuf;

#[derive(Parser)]
#[command(version, about, long_about = None)]
pub struct Cli {
#[command(subcommand)]
pub command: Commands,
}

#[derive(Subcommand)]
pub enum Commands {
/// Authenticate with a given shopware shop via integration admin API.
/// Credentials are stored in .credentials.toml in the current working directory.
Auth {
/// base URL of the shop
#[arg(short, long)]
domain: String,

/// access_key_id
#[arg(short, long)]
id: String,

/// access_key_secret
#[arg(short, long)]
secret: String,
},

/// Import data into shopware or export data to a file
Sync {
/// Mode (import or export)
#[arg(value_enum, short, long)]
mode: SyncMode,

/// Path to profile schema.yaml
#[arg(short, long)]
schema: PathBuf,

/// Path to data file
#[arg(short, long)]
file: PathBuf,

/// Maximum amount of entities, can be used for debugging
#[arg(short, long)]
limit: Option<u64>,

// Verbose output, used for debugging
// #[arg(short, long, action = ArgAction::SetTrue)]
// verbose: bool,
/// How many requests can be "in-flight" at the same time
#[arg(short, long, default_value = "8")]
in_flight_limit: usize,
},
}

#[derive(Debug, Clone, Copy, clap::ValueEnum)]
pub enum SyncMode {
Import,
Export,
}
14 changes: 14 additions & 0 deletions src/config.rs → src/config_file.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
//! Definitions for the `schema.yaml` and `.credentials.toml` files
//!
//! Allows deserialization into a proper typed structure from these files
//! or also write these typed structures to a file (in case of `.credentials.toml`)
//!
//! Utilizes https://serde.rs/

use crate::api::filter::{CriteriaFilter, CriteriaSorting};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
Expand All @@ -12,15 +19,22 @@ pub struct Credentials {
#[derive(Debug, Deserialize)]
pub struct Schema {
pub entity: String,

#[serde(default = "Vec::new")]
pub filter: Vec<CriteriaFilter>,

#[serde(default = "Vec::new")]
pub sort: Vec<CriteriaSorting>,

/// Are unique thanks to `HashSet`
#[serde(default = "HashSet::new")]
pub associations: HashSet<String>,

pub mappings: Vec<Mapping>,

#[serde(default = "String::new")]
pub serialize_script: String,

#[serde(default = "String::new")]
pub deserialize_script: String,
}
Expand Down
3 changes: 3 additions & 0 deletions src/data/export.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Everything related to exporting data out of shopware

use crate::api::filter::Criteria;
use crate::data::transform::serialize_entity;
use crate::SyncContext;
Expand Down Expand Up @@ -68,6 +70,7 @@ async fn write_to_file(
csv_writer.write_record(get_header_line(context))?;

for handle in worker_handles {
// ToDo: we might want to handle the errors more gracefully here and don't stop on first error
let (page, rows) = handle.await??;
println!("writing page {}", page);

Expand Down
35 changes: 22 additions & 13 deletions src/data/import.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::api::{SwApiError, SyncAction};
//! Everything related to import data into shopware

use crate::api::{Entity, SwApiError, SyncAction};
use crate::data::transform::deserialize_row;
use crate::SyncContext;
use anyhow::anyhow;
use itertools::Itertools;
use std::sync::Arc;

Expand All @@ -11,27 +14,32 @@ pub async fn import(context: Arc<SyncContext>) -> anyhow::Result<()> {
.from_path(&context.file)?;
let headers = csv_reader.headers()?.clone();

// create an iterator, that processes (CSV) rows (StringRecord) into (usize, anyhow::Result<Entity>)
// where the former is the row index
let iter = csv_reader
.into_records()
.map(|r| {
let result = r.expect("failed reading CSV row");

deserialize_row(&headers, result, &context).expect("deserialize failed")
// ToDo improve error handling
.map(|r| match r {
Ok(row) => deserialize_row(&headers, row, &context),
Err(e) => Err(anyhow!(e)),
})
.enumerate()
.take(context.limit.unwrap_or(u64::MAX) as usize);

// iterate in chunks of 500 or less
let mut join_handles = vec![];
for sync_values in &iter.chunks(500) {
let (mut row_indices, mut chunk): (
Vec<usize>,
Vec<serde_json::Map<String, serde_json::Value>>,
) = sync_values.unzip();
let (mut row_indices, chunk): (Vec<usize>, Vec<anyhow::Result<Entity>>) =
sync_values.unzip();

// for now fail on first invalid row
// currently the most likely deserialization failure is not finding the column / CSV header
// ToDo: we might want to handle the errors more gracefully here and don't stop on first error
let mut valid_chunk = chunk.into_iter().collect::<anyhow::Result<Vec<Entity>>>()?;

// submit sync task
let context = Arc::clone(&context);
join_handles.push(tokio::spawn(async move {
match context.sw_client.sync(&context.schema.entity, SyncAction::Upsert, &chunk).await {
match context.sw_client.sync(&context.schema.entity, SyncAction::Upsert, &valid_chunk).await {
Ok(()) => Ok(()),
Err(SwApiError::Server(_, body)) => {
for err in body.errors.iter().rev() {
Expand All @@ -40,7 +48,7 @@ pub async fn import(context: Arc<SyncContext>) -> anyhow::Result<()> {
let entry: usize = entry_str.parse().expect("error pointer should contain usize");

let row_index = row_indices.remove(entry);
let row = chunk.remove(entry);
let row = valid_chunk.remove(entry);
println!(
"server validation error on row {}: {} Remaining pointer '{}' ignored payload:\n{}",
row_index + 2,
Expand All @@ -50,13 +58,14 @@ pub async fn import(context: Arc<SyncContext>) -> anyhow::Result<()> {
);
}
// retry
context.sw_client.sync(&context.schema.entity, SyncAction::Upsert, &chunk).await
context.sw_client.sync(&context.schema.entity, SyncAction::Upsert, &valid_chunk).await
},
Err(e) => Err(e),
}
}));
}

// wait for all the sync tasks to finish
for join_handle in join_handles {
join_handle.await??;
}
Expand Down
5 changes: 3 additions & 2 deletions src/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ mod import;
mod transform;
mod validate;

// reexport the important functions / structs as part of this module
pub use export::export;
pub use import::import;
pub use transform::prepare_scripting_environment;
pub use transform::ScriptingEnvironment;
pub use transform::script::prepare_scripting_environment;
pub use transform::script::ScriptingEnvironment;
pub use validate::validate_paths_for_entity;
Loading

0 comments on commit 7374013

Please sign in to comment.