Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/revoltchat/backend into fea…
Browse files Browse the repository at this point in the history
…t/pushd
  • Loading branch information
IAmTomahawkx committed Sep 29, 2024
2 parents 8672388 + 5e1b2e1 commit 48735e9
Show file tree
Hide file tree
Showing 33 changed files with 340 additions and 135 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
if: github.event_name != 'pull_request'
strategy:
matrix:
project: [delta, bonfire]
project: [delta, bonfire, autumn]
name: Build ${{ matrix.project }} image
steps:
# Configure build environment
Expand Down Expand Up @@ -98,6 +98,10 @@ jobs:
"bonfire": {
"path": "crates/bonfire",
"tag": "${{ github.repository_owner }}/bonfire"
},
"autumn": {
"path": "crates/services/autumn",
"tag": "${{ github.repository_owner }}/autumn"
}
}
export_to: output
Expand Down
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions crates/bonfire/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ FROM ghcr.io/revoltchat/base:latest AS builder
FROM gcr.io/distroless/cc-debian12:nonroot
COPY --from=builder /home/rust/src/target/release/revolt-bonfire ./

EXPOSE 9000
EXPOSE 14703
USER nonroot
ENV HOST=0.0.0.0:9000
CMD ["./revolt-bonfire"]
69 changes: 22 additions & 47 deletions crates/bonfire/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use futures::{
FutureExt, SinkExt, StreamExt, TryStreamExt,
};
use redis_kiss::{PayloadType, REDIS_PAYLOAD_TYPE, REDIS_URI};
use revolt_config::report_internal_error;
use revolt_database::{
events::{client::EventV1, server::ClientMessage},
Database, User, UserHint,
Expand Down Expand Up @@ -99,27 +100,21 @@ pub async fn client(db: &'static Database, stream: TcpStream, addr: SocketAddr)
let user_id = state.cache.user_id.clone();

// Notify socket we have authenticated.
if let Err(err) = write.send(config.encode(&EventV1::Authenticated)).await {
error!("Failed to write: {err:?}");
sentry::capture_error(&err);
if report_internal_error!(write.send(config.encode(&EventV1::Authenticated)).await).is_err() {
return;
}

// Download required data to local cache and send Ready payload.
let ready_payload = match state
.generate_ready_payload(db, config.get_ready_payload_fields())
.await
{
let ready_payload = match report_internal_error!(
state
.generate_ready_payload(db, config.get_ready_payload_fields())
.await
) {
Ok(ready_payload) => ready_payload,
Err(err) => {
sentry::capture_error(&err);
return;
}
Err(_) => return,
};

if let Err(err) = write.send(config.encode(&ready_payload)).await {
error!("Failed to write: {err:?}");
sentry::capture_error(&err);
if report_internal_error!(write.send(config.encode(&ready_payload)).await).is_err() {
return;
}

Expand Down Expand Up @@ -214,21 +209,16 @@ async fn listener(
write: &Mutex<WsWriter>,
) {
let redis_config = RedisConfig::from_url(&REDIS_URI).unwrap();
let subscriber = match fred::types::Builder::from_config(redis_config).build_subscriber_client()
{
let subscriber = match report_internal_error!(
fred::types::Builder::from_config(redis_config).build_subscriber_client()
) {
Ok(subscriber) => subscriber,
Err(err) => {
error!("Failed to build a subscriber: {err:?}");
sentry::capture_error(&err);
return;
}
Err(_) => return,
};

if let Err(err) = subscriber.init().await {
error!("Failed to init subscriber: {err:?}");
sentry::capture_error(&err);
if report_internal_error!(subscriber.init().await).is_err() {
return;
};
}

// Handle Redis connection dropping
let (clean_up_s, clean_up_r) = async_channel::bounded(1);
Expand All @@ -249,17 +239,13 @@ async fn listener(
// Check for state changes for subscriptions.
match state.apply_state().await {
SubscriptionStateChange::Reset => {
if let Err(err) = subscriber.unsubscribe_all().await {
error!("Unsubscribe all failed: {err:?}");
sentry::capture_error(&err);
if report_internal_error!(subscriber.unsubscribe_all().await).is_err() {
break 'out;
}

let subscribed = state.subscribed.read().await;
for id in subscribed.iter() {
if let Err(err) = subscriber.subscribe(id).await {
error!("Subscribe failed: {err:?}");
sentry::capture_error(&err);
if report_internal_error!(subscriber.subscribe(id).await).is_err() {
break 'out;
}
}
Expand All @@ -272,9 +258,7 @@ async fn listener(
#[cfg(debug_assertions)]
info!("{addr:?} unsubscribing from {id}");

if let Err(err) = subscriber.unsubscribe(id).await {
error!("Unsubscribe failed: {err:?}");
sentry::capture_error(&err);
if report_internal_error!(subscriber.unsubscribe(id).await).is_err() {
break 'out;
}
}
Expand All @@ -283,9 +267,7 @@ async fn listener(
#[cfg(debug_assertions)]
info!("{addr:?} subscribing to {id}");

if let Err(err) = subscriber.subscribe(id).await {
error!("Subscribe failed: {err:?}");
sentry::capture_error(&err);
if report_internal_error!(subscriber.subscribe(id).await).is_err() {
break 'out;
}
}
Expand All @@ -310,13 +292,9 @@ async fn listener(
_ = t2 => {},
message = t1 => {
// Handle incoming events.
let message = match message {
let message = match report_internal_error!(message) {
Ok(message) => message,
Err(e) => {
error!("Error while consuming pub/sub messages: {e:?}");
sentry::capture_error(&e);
break 'out;
}
Err(_) => break 'out
};

let event = match *REDIS_PAYLOAD_TYPE {
Expand Down Expand Up @@ -393,10 +371,7 @@ async fn listener(
}
}

if let Err(err) = subscriber.quit().await {
error!("{}", err);
sentry::capture_error(&err);
}
report_internal_error!(subscriber.quit().await).ok();
}

#[allow(clippy::too_many_arguments)]
Expand Down
12 changes: 9 additions & 3 deletions crates/core/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ use futures_locks::RwLock;
use once_cell::sync::Lazy;
use serde::Deserialize;

pub use sentry::capture_error;
pub use sentry::{capture_error, capture_message, Level};

#[cfg(feature = "report-macros")]
#[macro_export]
macro_rules! report_error {
( $expr: expr, $error: ident $( $tt:tt )? ) => {
$expr
.inspect_err(|err| {
$crate::capture_error(err);
$crate::capture_message(
&format!("{err:?} ({}:{}:{})", file!(), line!(), column!()),
$crate::Level::Error,
);
})
.map_err(|_| ::revolt_result::create_error!($error))
};
Expand All @@ -26,7 +29,10 @@ macro_rules! report_internal_error {
( $expr: expr ) => {
$expr
.inspect_err(|err| {
$crate::capture_error(err);
$crate::capture_message(
&format!("{err:?} ({}:{}:{})", file!(), line!(), column!()),
$crate::Level::Error,
);
})
.map_err(|_| ::revolt_result::create_error!(InternalError))
};
Expand Down
4 changes: 3 additions & 1 deletion crates/core/database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ default = ["mongodb", "async-std-runtime", "tasks"]

[dependencies]
# Core
revolt-config = { version = "0.7.16", path = "../config" }
revolt-config = { version = "0.7.16", path = "../config", features = [
"report-macros",
] }
revolt-result = { version = "0.7.16", path = "../result" }
revolt-models = { version = "0.7.16", path = "../models", features = [
"validator",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ pub async fn create_database(db: &MongoDb) {
"hash": 1_i32
},
"name": "hash"
},
{
"key": {
"used_for.id": 1_i32
},
"name": "used_for_id"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
bson::{doc, from_bson, from_document, to_document, Bson, DateTime, Document},
options::FindOptions,
},
Invite, MongoDb, DISCRIMINATOR_SEARCH_SPACE,
AbstractChannels, AbstractServers, Channel, Invite, MongoDb, DISCRIMINATOR_SEARCH_SPACE,
};
use bson::oid::ObjectId;
use futures::StreamExt;
Expand All @@ -20,7 +20,7 @@ struct MigrationInfo {
revision: i32,
}

pub const LATEST_REVISION: i32 = 29;
pub const LATEST_REVISION: i32 = 30;

pub async fn migrate_database(db: &MongoDb) {
let migrations = db.col::<Document>("migrations");
Expand Down Expand Up @@ -1139,6 +1139,76 @@ pub async fn run_migrations(db: &MongoDb, revision: i32) -> i32 {
.expect("Failed to create attachment_hashes index.");
}

if revision <= 29 {
info!("Running migration [revision 29 / 29-09-2024]: Add creator_id to webhooks.");

#[derive(serde::Serialize, serde::Deserialize)]
struct WebhookShell {
_id: String,
channel_id: String,
}

let invites = db
.db()
.collection::<WebhookShell>("channel_webhooks")
.find(doc! {}, None)
.await
.expect("webhooks")
.filter_map(|s| async { s.ok() })
.collect::<Vec<WebhookShell>>()
.await;

for invite in invites {
let channel = db.fetch_channel(&invite.channel_id).await.expect("channel");
let creator_id = match channel {
Channel::Group { owner, .. } => owner,
Channel::TextChannel { server, .. } | Channel::VoiceChannel { server, .. } => {
let server = db.fetch_server(&server).await.expect("server");
server.owner
}
_ => unreachable!("not server or group channel!"),
};

db.db()
.collection::<Document>("channel_webhooks")
.update_one(
doc! {
"_id": invite._id,
},
doc! {
"$set" : {
"creator_id": creator_id
}
},
None,
)
.await
.expect("update webhook");
}
}

if revision <= 30 {
info!("Running migration [revision 30 / 29-09-2024]: Add index for used_for.id to attachments.");

db.db()
.run_command(
doc! {
"createIndexes": "attachments",
"indexes": [
{
"key": {
"used_for.id": 1_i32
},
"name": "used_for_id"
}
]
},
None,
)
.await
.expect("Failed to create attachments index.");
}

// Need to migrate fields on attachments, change `user_id`, `object_id`, etc to `parent`.

// Reminder to update LATEST_REVISION when adding new migrations.
Expand Down
6 changes: 5 additions & 1 deletion crates/core/database/src/models/channel_webhooks/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ auto_derived_partial!(
#[serde(skip_serializing_if = "Option::is_none")]
pub avatar: Option<File>,

/// User that created this webhook
pub creator_id: String,

/// The channel this webhook belongs to
pub channel_id: String,

Expand All @@ -43,6 +46,7 @@ impl Default for Webhook {
id: Default::default(),
name: Default::default(),
avatar: None,
creator_id: Default::default(),
channel_id: Default::default(),
permissions: Default::default(),
token: Default::default(),
Expand Down Expand Up @@ -70,7 +74,7 @@ impl Webhook {
if self.token.as_deref() == Some(token) {
Ok(())
} else {
Err(create_error!(InvalidCredentials))
Err(create_error!(NotAuthenticated))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/database/src/models/channels/ops/mongodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl AbstractChannels for MongoDb {

// Delete associated attachments
self.delete_many_attachments(doc! {
"object_id": &id
"used_for.id": &id
})
.await?;

Expand Down
Loading

0 comments on commit 48735e9

Please sign in to comment.