Skip to content

Commit

Permalink
View for storing monitored entities (#1003)
Browse files Browse the repository at this point in the history
* build(migrations): add dependency on `common-models`

* Revert "build(migrations): add dependency on `common-models`"

This reverts commit 575962e.

* feat(migrations): create view for monitored entities

* feat(models): create entity for new view

* feat(migrations): add new column called `origin_collection_id`

* chore(models): add new column to entity

* feat(services/miscellaneous): use new table for querying

* ci: Run CI

* fix(database): do not make column nullable

* chore(migrations): remove useless semi-comma

* feat(frontend): focus on re-ordered exercise

* build(backend): upgrade basic versions
  • Loading branch information
IgnisDa committed Sep 4, 2024
1 parent 46b611a commit e56fef7
Show file tree
Hide file tree
Showing 11 changed files with 484 additions and 178 deletions.
446 changes: 355 additions & 91 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ resolver = "2"
anyhow = "=1.0.82"
apalis = { version = "=0.5.3", features = ["cron", "limit"] }
argon2 = "=0.6.0-pre.1"
async-graphql = { version = "=7.0.7", features = [
async-graphql = { version = "=7.0.9", features = [
"chrono",
"decimal",
"log",
Expand All @@ -68,10 +68,11 @@ markdown = "=1.0.0-alpha.18"
nanoid = "=0.4.0"
openidconnect = "=3.5.0"
regex = "=1.10.5"
rust_decimal = "=1.35.0"
rust_decimal_macros = "=1.35.0"
schematic = { version = "=0.16.6", features = [
rust_decimal = "=1.36.0"
rust_decimal_macros = "=1.36.0"
schematic = { version = "=0.17.4", features = [
"config",
"env",
"json",
"schema",
"toml",
Expand All @@ -80,6 +81,7 @@ schematic = { version = "=0.16.6", features = [
"type_rust_decimal",
"url",
"renderer_template",
"validate",
"yaml",
], default-features = false }
sea-orm = { version = "=1.0.0", features = [
Expand All @@ -95,8 +97,8 @@ sea-orm = { version = "=1.0.0", features = [
], default-features = false }
sea-orm-migration = "=1.0.0"
sea-query = "=0.31.0"
serde = { version = "=1.0.204", features = ["derive"] }
serde_json = "=1.0.120"
serde = { version = "=1.0.209", features = ["derive"] }
serde_json = "=1.0.127"
serde_with = { version = "=3.9.0", features = ["chrono_0_4"] }
slug = "=0.1.5"
strum = { version = "=0.26.2", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ anyhow = { workspace = true }
apalis = { workspace = true }
application-utils = { path = "../../crates/utils/application" }
async-graphql = { workspace = true }
async-graphql-axum = "=7.0.7"
async-graphql-axum = "=7.0.9"
axum = { workspace = true }
aws-sdk-s3 = { workspace = true }
background = { path = "../../crates/background" }
Expand Down
9 changes: 8 additions & 1 deletion apps/frontend/app/routes/_dashboard.fitness.$action.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,11 @@ type FuncStartTimer = (
triggeredBy: { exerciseIdentifier: string; setIdx: number },
) => void;

const focusOnExercise = (idx: number) => {
const exercise = document.getElementById(idx.toString());
exercise?.scrollIntoView({ behavior: "smooth" });
};

const ExerciseDisplay = (props: {
exerciseIdx: number;
startTimer: FuncStartTimer;
Expand Down Expand Up @@ -1572,10 +1577,12 @@ const ReorderDrawer = (props: { opened: boolean; onClose: () => void }) => {
>
<DragDropContext
onDragEnd={({ destination, source }) => {
const reorderedExerciseDestinationIndex = destination?.index || 0;
exerciseElementsHandlers.reorder({
from: source.index,
to: destination?.index || 0,
to: reorderedExerciseDestinationIndex,
});
focusOnExercise(reorderedExerciseDestinationIndex);
}}
>
<Droppable droppableId="dnd-list">
Expand Down
2 changes: 2 additions & 0 deletions crates/migrations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod m20240828_add_last_login_on_column_to_user;
mod m20240828_zz_add_columns_to_daily_user_activity;
mod m20240829_change_structure_for_exercise_extra_information;
mod m20240903_add_changes_for_user_to_collection_removal;
mod m20240904_create_monitored_entity;

pub use m20230410_create_metadata::Metadata as AliasedMetadata;
pub use m20230413_create_person::Person as AliasedPerson;
Expand Down Expand Up @@ -80,6 +81,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240828_zz_add_columns_to_daily_user_activity::Migration),
Box::new(m20240829_change_structure_for_exercise_extra_information::Migration),
Box::new(m20240903_add_changes_for_user_to_collection_removal::Migration),
Box::new(m20240904_create_monitored_entity::Migration),
]
}
}
36 changes: 36 additions & 0 deletions crates/migrations/src/m20240904_create_monitored_entity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use indoc::indoc;
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

pub static MONITORED_ENTITY_VIEW_CREATION_SQL: &str = indoc! { r#"
CREATE VIEW monitored_entity AS
SELECT
ute."user_id",
cte."entity_id",
cte."entity_lot",
cte."collection_id" AS "origin_collection_id"
FROM
"collection_to_entity" cte
JOIN
"collection" c ON cte."collection_id" = c."id"
JOIN
"user_to_entity" ute ON cte."collection_id" = ute."collection_id"
WHERE
c."name" = 'Monitoring'
"# };

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared(MONITORED_ENTITY_VIEW_CREATION_SQL)
.await?;
Ok(())
}

async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/models/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod metadata_to_genre;
pub mod metadata_to_metadata;
pub mod metadata_to_metadata_group;
pub mod metadata_to_person;
pub mod monitored_entity;
pub mod notification_platform;
pub mod person;
pub mod queued_notification;
Expand Down
22 changes: 22 additions & 0 deletions crates/models/database/src/monitored_entity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.2

use enums::EntityLot;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "monitored_entity")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub user_id: String,
#[sea_orm(primary_key, auto_increment = false)]
pub entity_id: String,
#[sea_orm(primary_key, auto_increment = false)]
pub entity_lot: EntityLot,
pub origin_collection_id: String,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}
1 change: 1 addition & 0 deletions crates/models/database/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub use super::metadata_to_genre::Entity as MetadataToGenre;
pub use super::metadata_to_metadata::Entity as MetadataToMetadata;
pub use super::metadata_to_metadata_group::Entity as MetadataToMetadataGroup;
pub use super::metadata_to_person::Entity as MetadataToPerson;
pub use super::monitored_entity::Entity as MonitoredEntity;
pub use super::notification_platform::Entity as NotificationPlatform;
pub use super::person::Entity as Person;
pub use super::queued_notification::Entity as QueuedNotification;
Expand Down
127 changes: 49 additions & 78 deletions crates/services/miscellaneous/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ use database_models::{
calendar_event, collection, collection_to_entity,
functions::{associate_user_with_entity, get_user_to_entity_association},
genre, import_report, integration, metadata, metadata_group, metadata_to_genre,
metadata_to_metadata, metadata_to_metadata_group, metadata_to_person, notification_platform,
person,
metadata_to_metadata, metadata_to_metadata_group, metadata_to_person, monitored_entity,
notification_platform, person,
prelude::{
CalendarEvent, Collection, CollectionToEntity, Genre, ImportReport, Integration, Metadata,
MetadataGroup, MetadataToGenre, MetadataToMetadata, MetadataToMetadataGroup,
MetadataToPerson, NotificationPlatform, Person, QueuedNotification, Review, Seen, User,
UserToEntity,
MetadataToPerson, MonitoredEntity, NotificationPlatform, Person, QueuedNotification,
Review, Seen, User, UserToEntity,
},
queued_notification, review, seen, user, user_to_entity,
};
Expand Down Expand Up @@ -95,7 +95,7 @@ use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use sea_orm::{
prelude::DateTimeUtc, sea_query::NullOrdering, ActiveModelTrait, ActiveValue, ColumnTrait,
ConnectionTrait, DatabaseBackend, DatabaseConnection, DbBackend, EntityTrait, FromQueryResult,
ConnectionTrait, DatabaseBackend, DatabaseConnection, EntityTrait, FromQueryResult,
ItemsAndPagesNumber, Iterable, JoinType, ModelTrait, Order, PaginatorTrait, QueryFilter,
QueryOrder, QuerySelect, QueryTrait, RelationTrait, Statement, TransactionTrait,
};
Expand Down Expand Up @@ -189,8 +189,6 @@ impl MiscellaneousService {
}
}

type EntityBeingMonitoredByMap = HashMap<String, Vec<String>>;

impl MiscellaneousService {
pub async fn core_details(&self) -> CoreDetails {
let mut files_enabled = self.config.file_storage.is_enabled();
Expand Down Expand Up @@ -2806,8 +2804,9 @@ impl MiscellaneousService {
.await
.unwrap();
if !notifications.is_empty() {
let (meta_map, _, _) = self.get_entities_monitored_by().await.unwrap();
let users_to_notify = meta_map.get(metadata_id).cloned().unwrap_or_default();
let users_to_notify = self
.get_entities_monitored_by(metadata_id, EntityLot::Metadata)
.await?;
for notification in notifications {
for user_id in users_to_notify.iter() {
self.queue_media_state_changed_notification_for_user(user_id, &notification)
Expand Down Expand Up @@ -3461,8 +3460,26 @@ impl MiscellaneousService {
Ok(true)
}

async fn get_monitored_entities(
&self,
entity_lot: EntityLot,
) -> Result<HashMap<String, HashSet<String>>> {
let monitored_entities = MonitoredEntity::find()
.filter(monitored_entity::Column::EntityLot.eq(entity_lot))
.all(&self.db)
.await?;
let mut monitored_by = HashMap::new();
for entity in monitored_entities {
let user_ids = monitored_by
.entry(entity.entity_id)
.or_insert(HashSet::new());
user_ids.insert(entity.user_id);
}
Ok(monitored_by)
}

async fn update_watchlist_metadata_and_queue_notifications(&self) -> Result<()> {
let (meta_map, _, _) = self.get_entities_monitored_by().await?;
let meta_map = self.get_monitored_entities(EntityLot::Metadata).await?;
ryot_log!(
debug,
"Users to be notified for metadata state changes: {:?}",
Expand All @@ -3481,7 +3498,7 @@ impl MiscellaneousService {
}

async fn update_monitored_people_and_queue_notifications(&self) -> Result<()> {
let (_, _, person_map) = self.get_entities_monitored_by().await?;
let person_map = self.get_monitored_entities(EntityLot::Person).await?;
ryot_log!(
debug,
"Users to be notified for people state changes: {:?}",
Expand Down Expand Up @@ -4135,9 +4152,10 @@ impl MiscellaneousService {
)
})
.collect_vec();
let (meta_map, _, _) = self.get_entities_monitored_by().await?;
for (metadata_id, notification) in notifications.into_iter() {
let users_to_notify = meta_map.get(&metadata_id).cloned().unwrap_or_default();
let users_to_notify = self
.get_entities_monitored_by(&metadata_id, EntityLot::Metadata)
.await?;
for user in users_to_notify {
self.queue_media_state_changed_notification_for_user(&user, &notification)
.await?;
Expand Down Expand Up @@ -4225,8 +4243,9 @@ impl MiscellaneousService {
.await
.unwrap_or_default();
if !notifications.is_empty() {
let (_, _, person_map) = self.get_entities_monitored_by().await.unwrap();
let users_to_notify = person_map.get(&person_id).cloned().unwrap_or_default();
let users_to_notify = self
.get_entities_monitored_by(&person_id, EntityLot::Person)
.await?;
for notification in notifications {
for user_id in users_to_notify.iter() {
self.queue_media_state_changed_notification_for_user(user_id, &notification)
Expand All @@ -4240,72 +4259,24 @@ impl MiscellaneousService {

async fn get_entities_monitored_by(
&self,
) -> Result<(
EntityBeingMonitoredByMap,
EntityBeingMonitoredByMap,
EntityBeingMonitoredByMap,
)> {
#[derive(Debug, FromQueryResult, Clone, Default)]
struct UsersToBeNotified {
entity_id: String,
to_notify: Vec<String>,
}
let get_sql = |entity_type: &str| {
format!(
r#"
SELECT
m.id as entity_id,
array_agg(DISTINCT u.id) as to_notify
FROM {entity_type} m
JOIN collection_to_entity cte ON m.id = cte.{entity_type}_id
JOIN collection c ON cte.collection_id = c.id AND c.name = '{}'
JOIN "user" u ON c.user_id = u.id
GROUP BY m.id;
"#,
DefaultCollection::Monitoring
)
};
let meta_map: Vec<_> = UsersToBeNotified::find_by_statement(
Statement::from_sql_and_values(DbBackend::Postgres, get_sql("metadata"), []),
)
.all(&self.db)
.await?;
let meta_map = meta_map
.into_iter()
.map(|m| (m.entity_id, m.to_notify))
.collect::<EntityBeingMonitoredByMap>();
let meta_group_map: Vec<_> = UsersToBeNotified::find_by_statement(
Statement::from_sql_and_values(DbBackend::Postgres, get_sql("metadata_group"), []),
)
.all(&self.db)
.await?;
let meta_group_map = meta_group_map
.into_iter()
.map(|m| (m.entity_id, m.to_notify))
.collect::<EntityBeingMonitoredByMap>();
let person_map: Vec<_> = UsersToBeNotified::find_by_statement(
Statement::from_sql_and_values(DbBackend::Postgres, get_sql("person"), []),
)
.all(&self.db)
.await?;
let person_map = person_map
.into_iter()
.map(|m| (m.entity_id, m.to_notify))
.collect::<EntityBeingMonitoredByMap>();
Ok((meta_map, meta_group_map, person_map))
entity_id: &String,
entity_lot: EntityLot,
) -> Result<Vec<String>> {
let all_entities = MonitoredEntity::find()
.select_only()
.column(monitored_entity::Column::UserId)
.filter(monitored_entity::Column::EntityId.eq(entity_id))
.filter(monitored_entity::Column::EntityLot.eq(entity_lot))
.into_tuple::<String>()
.all(&self.db)
.await?;
Ok(all_entities)
}

pub async fn handle_review_posted_event(&self, event: ReviewPostedEvent) -> Result<()> {
let (meta_map, meta_group_map, person_map) = self.get_entities_monitored_by().await?;
let monitored_by = match event.entity_lot {
EntityLot::Metadata => meta_map.get(&event.obj_id).cloned().unwrap_or_default(),
EntityLot::MetadataGroup => meta_group_map
.get(&event.obj_id)
.cloned()
.unwrap_or_default(),
EntityLot::Person => person_map.get(&event.obj_id).cloned().unwrap_or_default(),
_ => vec![],
};
let monitored_by = self
.get_entities_monitored_by(&event.obj_id, event.entity_lot)
.await?;
let users = User::find()
.select_only()
.column(user::Column::Id)
Expand Down
2 changes: 1 addition & 1 deletion crates/services/notification/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ askama = "0.12.1"
common-utils = { path = "../../utils/common" }
config = { path = "../../config" }
convert_case = { workspace = true }
lettre = "=0.11.7"
lettre = "=0.11.8"
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down

0 comments on commit e56fef7

Please sign in to comment.