From c33827831bcd28aa6cadb665ab1e8e90daf005f8 Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Tue, 19 Sep 2023 15:15:53 +0000 Subject: [PATCH 01/19] basic project setup --- cabal.project | 3 + nix/local-haskell-packages.nix | 1 + tools/rabbitmq-consumer/README.md | 3 + tools/rabbitmq-consumer/app/Main.hs | 23 ++++++ tools/rabbitmq-consumer/default.nix | 23 ++++++ .../rabbitmq-consumer/rabbitmq-consumer.cabal | 74 +++++++++++++++++ .../src/RabbitMQConsumer/Lib.hs | 82 +++++++++++++++++++ 7 files changed, 209 insertions(+) create mode 100644 tools/rabbitmq-consumer/README.md create mode 100644 tools/rabbitmq-consumer/app/Main.hs create mode 100644 tools/rabbitmq-consumer/default.nix create mode 100644 tools/rabbitmq-consumer/rabbitmq-consumer.cabal create mode 100644 tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs diff --git a/cabal.project b/cabal.project index 60b607ea01b..caf9ed18d1e 100644 --- a/cabal.project +++ b/cabal.project @@ -48,6 +48,7 @@ packages: , tools/db/repair-brig-clients-table/ , tools/db/service-backfill/ , tools/fedcalls/ + , tools/rabbitmq-consumer , tools/rex/ , tools/stern/ @@ -112,6 +113,8 @@ package polysemy-wire-zoo ghc-options: -Werror package proxy ghc-options: -Werror +package rabbitmq-consumer + ghc-options: -Werror package repair-handles ghc-options: -Werror package rex diff --git a/nix/local-haskell-packages.nix b/nix/local-haskell-packages.nix index f06c352c734..65a35a13994 100644 --- a/nix/local-haskell-packages.nix +++ b/nix/local-haskell-packages.nix @@ -52,6 +52,7 @@ repair-handles = hself.callPackage ../tools/db/repair-handles/default.nix { inherit gitignoreSource; }; service-backfill = hself.callPackage ../tools/db/service-backfill/default.nix { inherit gitignoreSource; }; fedcalls = hself.callPackage ../tools/fedcalls/default.nix { inherit gitignoreSource; }; + rabbitmq-consumer = hself.callPackage ../tools/rabbitmq-consumer/default.nix { inherit gitignoreSource; }; rex = hself.callPackage ../tools/rex/default.nix { inherit gitignoreSource; }; stern = hself.callPackage ../tools/stern/default.nix { inherit gitignoreSource; }; } diff --git a/tools/rabbitmq-consumer/README.md b/tools/rabbitmq-consumer/README.md new file mode 100644 index 00000000000..dbbf318b1a2 --- /dev/null +++ b/tools/rabbitmq-consumer/README.md @@ -0,0 +1,3 @@ +# RabbitMQ Consumer + +CLI tool to consume messages from a RabbitMQ queue diff --git a/tools/rabbitmq-consumer/app/Main.hs b/tools/rabbitmq-consumer/app/Main.hs new file mode 100644 index 00000000000..0379be52e30 --- /dev/null +++ b/tools/rabbitmq-consumer/app/Main.hs @@ -0,0 +1,23 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2023 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Main where + +import qualified RabbitMQConsumer.Lib as Lib + +main :: IO () +main = Lib.main diff --git a/tools/rabbitmq-consumer/default.nix b/tools/rabbitmq-consumer/default.nix new file mode 100644 index 00000000000..cbe27109962 --- /dev/null +++ b/tools/rabbitmq-consumer/default.nix @@ -0,0 +1,23 @@ +# WARNING: GENERATED FILE, DO NOT EDIT. +# This file is generated by running hack/bin/generate-local-nix-packages.sh and +# must be regenerated whenever local packages are added or removed, or +# dependencies are added or removed. +{ mkDerivation +, base +, gitignoreSource +, imports +, lib +, optparse-applicative +}: +mkDerivation { + pname = "rabbitmq-consumer"; + version = "1.0.0"; + src = gitignoreSource ./.; + isLibrary = true; + isExecutable = true; + libraryHaskellDepends = [ imports optparse-applicative ]; + executableHaskellDepends = [ base ]; + description = "CLI tool to consume messages from a RabbitMQ queue"; + license = lib.licenses.agpl3Only; + mainProgram = "rabbitmq-consumer"; +} diff --git a/tools/rabbitmq-consumer/rabbitmq-consumer.cabal b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal new file mode 100644 index 00000000000..1330721cdce --- /dev/null +++ b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal @@ -0,0 +1,74 @@ +cabal-version: 3.0 +name: rabbitmq-consumer +version: 1.0.0 +synopsis: CLI tool to consume messages from a RabbitMQ queue +category: Network +author: Wire Swiss GmbH +maintainer: Wire Swiss GmbH +copyright: (c) 2023 Wire Swiss GmbH +license: AGPL-3.0-only +build-type: Simple + +executable rabbitmq-consumer + main-is: Main.hs + build-depends: + , base + , rabbitmq-consumer + + hs-source-dirs: app + +library + hs-source-dirs: src + exposed-modules: RabbitMQConsumer.Lib + default-language: GHC2021 + ghc-options: + -Wall -Wpartial-fields -fwarn-tabs + -optP-Wno-nonportable-include-path + + build-depends: + , imports + , optparse-applicative + + default-extensions: + NoImplicitPrelude + AllowAmbiguousTypes + BangPatterns + ConstraintKinds + DataKinds + DefaultSignatures + DeriveFunctor + DeriveGeneric + DeriveLift + DeriveTraversable + DerivingStrategies + DerivingVia + DuplicateRecordFields + EmptyCase + FlexibleContexts + FlexibleInstances + FunctionalDependencies + GADTs + GeneralizedNewtypeDeriving + InstanceSigs + KindSignatures + LambdaCase + MultiParamTypeClasses + MultiWayIf + NamedFieldPuns + OverloadedLabels + OverloadedRecordDot + PackageImports + PatternSynonyms + PolyKinds + QuasiQuotes + RankNTypes + RecordWildCards + ScopedTypeVariables + StandaloneDeriving + TupleSections + TypeApplications + TypeFamilies + TypeFamilyDependencies + TypeOperators + UndecidableInstances + ViewPatterns diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs new file mode 100644 index 00000000000..d52bef2e59c --- /dev/null +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -0,0 +1,82 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2023 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module RabbitMQConsumer.Lib where + +import Imports +import Options.Applicative + +data Opts = Opts + { host :: String, + port :: Int, + username :: String, + password :: String, + vhost :: String + } + deriving (Show) + +optsParser :: Parser Opts +optsParser = + Opts + <$> strOption + ( long "host" + <> short 's' + <> metavar "HOST" + <> help "RabbitMQ host" + <> value "localhost" + <> showDefault + ) + <*> option + auto + ( long "port" + <> short 'p' + <> metavar "PORT" + <> help "RabbitMQ Port" + <> value 15762 + <> showDefault + ) + <*> strOption + ( long "username" + <> short 'u' + <> metavar "USERNAME" + <> help "RabbitMQ Username" + <> value "guest" + <> showDefault + ) + <*> strOption + ( long "password" + <> short 'w' + <> metavar "PASSWORD" + <> help "RabbitMQ Password" + <> value "guest" + <> showDefault + ) + <*> strOption + ( long "vhost" + <> short 'v' + <> metavar "VHOST" + <> help "RabbitMQ VHost" + <> value "/" + <> showDefault + ) + +main :: IO () +main = do + opts <- execParser (info (helper <*> optsParser) desc) + putStrLn $ "Connecting to RabbitMQ at " <> show opts + where + desc = header "rabbitmq-consumer" <> progDesc "CLI tool to consume messages from a RabbitMQ queue" <> fullDesc From b71968ea82fb77a1e3cc0d1630f9c91ea257e879 Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Wed, 20 Sep 2023 13:22:13 +0000 Subject: [PATCH 02/19] wip --- .../rabbitmq-consumer/rabbitmq-consumer.cabal | 4 ++ .../src/RabbitMQConsumer/Lib.hs | 62 +++++++++++++++---- 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/tools/rabbitmq-consumer/rabbitmq-consumer.cabal b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal index 1330721cdce..2dd4cccb557 100644 --- a/tools/rabbitmq-consumer/rabbitmq-consumer.cabal +++ b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal @@ -26,7 +26,11 @@ library -optP-Wno-nonportable-include-path build-depends: + , amqp + , base + , bytestring , imports + , network , optparse-applicative default-extensions: diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index d52bef2e59c..38f957929f3 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -14,18 +14,53 @@ -- -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . +{-# LANGUAGE OverloadedStrings #-} module RabbitMQConsumer.Lib where +import Data.ByteString.Lazy.Char8 qualified as BL import Imports +import Network.AMQP +import Network.Socket import Options.Applicative +main :: IO () +main = do + opts <- execParser (info (helper <*> optsParser) desc) + conn <- openConnection' opts.host opts.port opts.vhost opts.username opts.password + chan <- openChannel conn + + _ <- consumeMsgs chan opts.queue Ack myCallback + + mMsg <- getMsg chan Ack opts.queue + case mMsg of + Nothing -> putStrLn "no message" + Just (msg, env) -> do + putStrLn $ "received message:" + putStrLn $ BL.unpack (msgBody msg) + let requeue = True + rejectEnv env requeue + closeConnection conn + putStrLn "connection closed" + where + desc = header "rabbitmq-consumer" <> progDesc "CLI tool to consume messages from a RabbitMQ queue" <> fullDesc + +myCallback :: (Message, Envelope) -> IO () +myCallback (msg, env) = do + putStrLn $ "received message: " <> BL.unpack (msgBody msg) + -- acknowledge receiving the message + putStrLn $ "received message:" + putStrLn $ BL.unpack (msgBody msg) + let requeue = True + rejectEnv env requeue + data Opts = Opts { host :: String, - port :: Int, - username :: String, - password :: String, - vhost :: String + port :: PortNumber, + username :: Text, + password :: Text, + vhost :: Text, + queue :: Text } deriving (Show) @@ -46,7 +81,7 @@ optsParser = <> short 'p' <> metavar "PORT" <> help "RabbitMQ Port" - <> value 15762 + <> value 5672 <> showDefault ) <*> strOption @@ -62,7 +97,7 @@ optsParser = <> short 'w' <> metavar "PASSWORD" <> help "RabbitMQ Password" - <> value "guest" + <> value "alpaca-grapefruit" <> showDefault ) <*> strOption @@ -73,10 +108,11 @@ optsParser = <> value "/" <> showDefault ) - -main :: IO () -main = do - opts <- execParser (info (helper <*> optsParser) desc) - putStrLn $ "Connecting to RabbitMQ at " <> show opts - where - desc = header "rabbitmq-consumer" <> progDesc "CLI tool to consume messages from a RabbitMQ queue" <> fullDesc + <*> strOption + ( long "queue" + <> short 'q' + <> metavar "QUEUE" + <> help "RabbitMQ Queue" + <> value "test" + <> showDefault + ) From b3c9f26c7cf160bb79b907b5efef57c00a126ed0 Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 25 Sep 2023 10:10:12 +0000 Subject: [PATCH 03/19] wip --- tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index 38f957929f3..5ad6503fc1a 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -32,14 +32,7 @@ main = do _ <- consumeMsgs chan opts.queue Ack myCallback - mMsg <- getMsg chan Ack opts.queue - case mMsg of - Nothing -> putStrLn "no message" - Just (msg, env) -> do - putStrLn $ "received message:" - putStrLn $ BL.unpack (msgBody msg) - let requeue = True - rejectEnv env requeue + threadDelay $ 10 * 1000 * 1000 -- 10 seconds closeConnection conn putStrLn "connection closed" where From 6486c16d986a0e9c01877797d98947fd93917e7c Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Tue, 26 Sep 2023 12:13:04 +0000 Subject: [PATCH 04/19] wip --- integration/test/Test/Federation.hs | 4 ++++ .../rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs | 15 ++++++++------- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/integration/test/Test/Federation.hs b/integration/test/Test/Federation.hs index 97ec011f028..f750af78f76 100644 --- a/integration/test/Test/Federation.hs +++ b/integration/test/Test/Federation.hs @@ -123,6 +123,10 @@ testNotificationsForOfflineBackends = do delUserDeletedNotif <- nPayload $ awaitMatch 10 isDeleteUserNotif ws objQid delUserDeletedNotif `shouldMatch` objQid delUser + -- TODO(leif): remove + putStrLn "Press enter to continue..." + _ <- getLine + runCodensity (startDynamicBackend downBackend mempty) $ \_ -> do newMsgNotif <- awaitNotification downUser1 downClient1 noValue 5 isNewMessageNotif newMsgNotif %. "payload.0.qualified_conversation" `shouldMatch` objQidObject upBackendConv diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index 5ad6503fc1a..c61a78700fa 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -21,6 +21,7 @@ module RabbitMQConsumer.Lib where import Data.ByteString.Lazy.Char8 qualified as BL import Imports import Network.AMQP +import Network.AMQP.Types (FieldTable (..)) import Network.Socket import Options.Applicative @@ -29,8 +30,12 @@ main = do opts <- execParser (info (helper <*> optsParser) desc) conn <- openConnection' opts.host opts.port opts.vhost opts.username opts.password chan <- openChannel conn + recoverMsgs chan True - _ <- consumeMsgs chan opts.queue Ack myCallback + let abort = cancelConsumer chan + + _tag <- consumeMsgs' chan opts.queue Ack (myCallback abort) abort (FieldTable mempty) + putStrLn "waiting for messages..." threadDelay $ 10 * 1000 * 1000 -- 10 seconds closeConnection conn @@ -38,14 +43,10 @@ main = do where desc = header "rabbitmq-consumer" <> progDesc "CLI tool to consume messages from a RabbitMQ queue" <> fullDesc -myCallback :: (Message, Envelope) -> IO () -myCallback (msg, env) = do - putStrLn $ "received message: " <> BL.unpack (msgBody msg) - -- acknowledge receiving the message +myCallback :: (ConsumerTag -> IO ()) -> (Message, Envelope) -> IO () +myCallback abort (msg, _env) = do putStrLn $ "received message:" putStrLn $ BL.unpack (msgBody msg) - let requeue = True - rejectEnv env requeue data Opts = Opts { host :: String, From 6d28fb275ba2224ea4c5b3534c548867fe42cd4c Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Tue, 26 Sep 2023 13:08:16 +0000 Subject: [PATCH 05/19] wip --- integration/test/Test/Federation.hs | 4 --- .../src/RabbitMQConsumer/Lib.hs | 27 +++++++++++-------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/integration/test/Test/Federation.hs b/integration/test/Test/Federation.hs index f750af78f76..97ec011f028 100644 --- a/integration/test/Test/Federation.hs +++ b/integration/test/Test/Federation.hs @@ -123,10 +123,6 @@ testNotificationsForOfflineBackends = do delUserDeletedNotif <- nPayload $ awaitMatch 10 isDeleteUserNotif ws objQid delUserDeletedNotif `shouldMatch` objQid delUser - -- TODO(leif): remove - putStrLn "Press enter to continue..." - _ <- getLine - runCodensity (startDynamicBackend downBackend mempty) $ \_ -> do newMsgNotif <- awaitNotification downUser1 downClient1 noValue 5 isNewMessageNotif newMsgNotif %. "payload.0.qualified_conversation" `shouldMatch` objQidObject upBackendConv diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index c61a78700fa..ed27ce58670 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -21,32 +21,37 @@ module RabbitMQConsumer.Lib where import Data.ByteString.Lazy.Char8 qualified as BL import Imports import Network.AMQP -import Network.AMQP.Types (FieldTable (..)) import Network.Socket import Options.Applicative main :: IO () main = do opts <- execParser (info (helper <*> optsParser) desc) + done <- newEmptyMVar conn <- openConnection' opts.host opts.port opts.vhost opts.username opts.password chan <- openChannel conn recoverMsgs chan True - - let abort = cancelConsumer chan - - _tag <- consumeMsgs' chan opts.queue Ack (myCallback abort) abort (FieldTable mempty) - putStrLn "waiting for messages..." - - threadDelay $ 10 * 1000 * 1000 -- 10 seconds + void $ consumeMsgs chan opts.queue Ack (myCallback opts done) + takeMVar done closeConnection conn putStrLn "connection closed" where desc = header "rabbitmq-consumer" <> progDesc "CLI tool to consume messages from a RabbitMQ queue" <> fullDesc -myCallback :: (ConsumerTag -> IO ()) -> (Message, Envelope) -> IO () -myCallback abort (msg, _env) = do - putStrLn $ "received message:" +myCallback :: Opts -> MVar () -> (Message, Envelope) -> IO () +myCallback opts done (msg, env) = do + putStrLn $ "received message (vhost=" <> cs opts.vhost <> ") (queue=" <> cs opts.queue <> "):\n" putStrLn $ BL.unpack (msgBody msg) + putStrLn $ "\ntype 'drop' to drop the message and terminate, or press enter to terminate without dropping the message" + input <- getLine + if input == "drop" + then do + ackEnv env + putStrLn "message dropped" + else putStrLn "message not dropped" + putMVar done () + -- block and stop processing any more messages + forever $ threadDelay maxBound data Opts = Opts { host :: String, From dafdb0d4f742074e47aa24e5b5668ca2b30d40e6 Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Thu, 28 Sep 2023 10:57:11 +0000 Subject: [PATCH 06/19] subcommands --- tools/rabbitmq-consumer/README.md | 29 +++- .../rabbitmq-consumer/rabbitmq-consumer.cabal | 1 + .../src/RabbitMQConsumer/Lib.hs | 124 +++++++++++++++--- 3 files changed, 134 insertions(+), 20 deletions(-) diff --git a/tools/rabbitmq-consumer/README.md b/tools/rabbitmq-consumer/README.md index dbbf318b1a2..387285946ec 100644 --- a/tools/rabbitmq-consumer/README.md +++ b/tools/rabbitmq-consumer/README.md @@ -1,3 +1,30 @@ # RabbitMQ Consumer -CLI tool to consume messages from a RabbitMQ queue +```txt +rabbitmq-consumer + +Usage: rabbitmq-consumer [-s|--host HOST] [-p|--port PORT] + [-u|--username USERNAME] [-w|--password PASSWORD] + [-v|--vhost VHOST] [-q|--queue QUEUE] + [-t|--timeout TIMEOUT] COMMAND + + CLI tool to consume messages from a RabbitMQ queue + +Available options: + -h,--help Show this help text + -s,--host HOST RabbitMQ host (default: "localhost") + -p,--port PORT RabbitMQ Port (default: 5672) + -u,--username USERNAME RabbitMQ Username (default: "guest") + -w,--password PASSWORD RabbitMQ Password (default: "alpaca-grapefruit") + -v,--vhost VHOST RabbitMQ VHost (default: "/") + -q,--queue QUEUE RabbitMQ Queue (default: "test") + -t,--timeout TIMEOUT Timeout in seconds. The command will timeout if no + messages are received within this time. This can + happen when the queue is empty, or when we lose the + single active consumer race. (default: 10) + +Available commands: + head Print the first message in the queue + drop-head Drop the first message in the queue + interactive Interactively drop the first message from the queue +``` diff --git a/tools/rabbitmq-consumer/rabbitmq-consumer.cabal b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal index 2dd4cccb557..c3cdf1de03a 100644 --- a/tools/rabbitmq-consumer/rabbitmq-consumer.cabal +++ b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal @@ -26,6 +26,7 @@ library -optP-Wno-nonportable-include-path build-depends: + , aeson , amqp , base , bytestring diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index ed27ce58670..f96e3daa432 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -18,6 +18,7 @@ module RabbitMQConsumer.Lib where +import Data.Aeson (FromJSON, decode) import Data.ByteString.Lazy.Char8 qualified as BL import Imports import Network.AMQP @@ -27,31 +28,67 @@ import Options.Applicative main :: IO () main = do opts <- execParser (info (helper <*> optsParser) desc) - done <- newEmptyMVar conn <- openConnection' opts.host opts.port opts.vhost opts.username opts.password chan <- openChannel conn - recoverMsgs chan True - void $ consumeMsgs chan opts.queue Ack (myCallback opts done) + qos chan 0 1 False + done <- newEmptyMVar + runTimerAsync done opts.timeoutSec + case opts.cmd of + Head -> void $ consumeMsgs chan opts.queue Ack (printHead done opts) + Interactive -> void $ consumeMsgs chan opts.queue Ack (interactive done opts) + DropHead dhOpts -> void $ consumeMsgs chan opts.queue Ack (dropHead done opts dhOpts) takeMVar done closeConnection conn putStrLn "connection closed" where desc = header "rabbitmq-consumer" <> progDesc "CLI tool to consume messages from a RabbitMQ queue" <> fullDesc -myCallback :: Opts -> MVar () -> (Message, Envelope) -> IO () -myCallback opts done (msg, env) = do - putStrLn $ "received message (vhost=" <> cs opts.vhost <> ") (queue=" <> cs opts.queue <> "):\n" - putStrLn $ BL.unpack (msgBody msg) - putStrLn $ "\ntype 'drop' to drop the message and terminate, or press enter to terminate without dropping the message" - input <- getLine - if input == "drop" - then do - ackEnv env - putStrLn "message dropped" - else putStrLn "message not dropped" - putMVar done () - -- block and stop processing any more messages - forever $ threadDelay maxBound + printHead :: MVar () -> Opts -> (Message, Envelope) -> IO () + printHead done opts (msg, _env) = do + putStrLn $ displayMessage opts msg + void $ tryPutMVar done () + + dropHead :: MVar () -> Opts -> DropHeadOpts -> (Message, Envelope) -> IO () + dropHead done opts dhOpts (msg, env) = do + putStrLn $ displayMessage opts msg + case decode @MessageJSON msg.msgBody of + Nothing -> putStrLn "failed to decode message body" + Just json -> do + if json.path == dhOpts.path + then do + putStrLn "dropping message" + nackEnv env + else do + putStrLn "path does not match. keeping message" + void $ tryPutMVar done () + + interactive :: MVar () -> Opts -> (Message, Envelope) -> IO () + interactive done opts (msg, env) = do + putStrLn $ displayMessage opts msg + putStrLn $ "type 'drop' to drop the message and terminate, or press enter to terminate without dropping the message" + input <- getLine + if input == "drop" + then do + ackEnv env + putStrLn "message dropped" + else putStrLn "message not dropped" + void $ tryPutMVar done () + + displayMessage :: Opts -> Message -> String + displayMessage opts msg = + intercalate + "\n" + [ "vhost: " <> cs opts.vhost, + "queue: " <> cs opts.queue, + "timestamp: " <> show msg.msgTimestamp, + "received message: " <> BL.unpack msg.msgBody + ] + + runTimerAsync :: MVar () -> Int -> IO () + runTimerAsync done sec = void $ forkIO $ do + threadDelay (sec * 1000000) + putStrLn $ "timeout after " <> show sec <> " seconds" + void $ tryPutMVar done () data Opts = Opts { host :: String, @@ -59,9 +96,16 @@ data Opts = Opts username :: Text, password :: Text, vhost :: Text, - queue :: Text + queue :: Text, + timeoutSec :: Int, + cmd :: Command + } + +data DropHeadOpts = DropHeadOpts + { path :: String } - deriving (Show) + +data Command = Head | DropHead DropHeadOpts | Interactive optsParser :: Parser Opts optsParser = @@ -115,3 +159,45 @@ optsParser = <> value "test" <> showDefault ) + <*> option + auto + ( long "timeout" + <> short 't' + <> metavar "TIMEOUT" + <> help + "Timeout in seconds. The command will timeout if no messages are received within this time. \ + \This can happen when the queue is empty, \ + \or when we lose the single active consumer race." + <> value 10 + <> showDefault + ) + <*> hsubparser (headCommand <> dropHeadCommand <> interactiveCommand) + +headCommand :: Mod CommandFields Command +headCommand = + (command "head" (info (pure Head) (progDesc "Print the first message in the queue"))) + +dropHeadCommand :: Mod CommandFields Command +dropHeadCommand = + (command "drop-head" (info p (progDesc "Drop the first message in the queue"))) + where + p :: Parser Command + p = + DropHead + . DropHeadOpts + <$> strOption + ( long "path" + <> short 'a' + <> metavar "PATH" + <> help "only drop the first message if the path matches" + ) + +interactiveCommand :: Mod CommandFields Command +interactiveCommand = + (command "interactive" (info (pure Interactive) (progDesc "Interactively drop the first message from the queue"))) + +data MessageJSON = MessageJSON + {path :: String} + deriving (Generic, Show) + +instance FromJSON MessageJSON From 104f752432ed57f9292ff2c19ed0b6809df4df3a Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Thu, 28 Sep 2023 10:59:07 +0000 Subject: [PATCH 07/19] changelog --- changelog.d/5-internal/WPB-4748 | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5-internal/WPB-4748 diff --git a/changelog.d/5-internal/WPB-4748 b/changelog.d/5-internal/WPB-4748 new file mode 100644 index 00000000000..59a431e2b99 --- /dev/null +++ b/changelog.d/5-internal/WPB-4748 @@ -0,0 +1 @@ +CLI tool to consume messages from a RabbitMQ queue From 171b8901e0e1f6b0e4c34372858f5549884ae8e3 Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Thu, 28 Sep 2023 12:08:18 +0000 Subject: [PATCH 08/19] nix packages --- tools/rabbitmq-consumer/default.nix | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tools/rabbitmq-consumer/default.nix b/tools/rabbitmq-consumer/default.nix index cbe27109962..141e7716722 100644 --- a/tools/rabbitmq-consumer/default.nix +++ b/tools/rabbitmq-consumer/default.nix @@ -3,10 +3,14 @@ # must be regenerated whenever local packages are added or removed, or # dependencies are added or removed. { mkDerivation +, aeson +, amqp , base +, bytestring , gitignoreSource , imports , lib +, network , optparse-applicative }: mkDerivation { @@ -15,7 +19,15 @@ mkDerivation { src = gitignoreSource ./.; isLibrary = true; isExecutable = true; - libraryHaskellDepends = [ imports optparse-applicative ]; + libraryHaskellDepends = [ + aeson + amqp + base + bytestring + imports + network + optparse-applicative + ]; executableHaskellDepends = [ base ]; description = "CLI tool to consume messages from a RabbitMQ queue"; license = lib.licenses.agpl3Only; From f7c5cf72de1a5aa27935bec0a27dcdbb508077bb Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 16 Oct 2023 08:35:14 +0000 Subject: [PATCH 09/19] added to executablesMap --- nix/wire-server.nix | 1 + 1 file changed, 1 insertion(+) diff --git a/nix/wire-server.nix b/nix/wire-server.nix index 12aafac3ddd..51220ccf979 100644 --- a/nix/wire-server.nix +++ b/nix/wire-server.nix @@ -84,6 +84,7 @@ let zauth = [ "zauth" ]; background-worker = [ "background-worker" ]; integration = [ "integration" ]; + rabbitmq-consumer = [ "rabbitmq-consumer" ]; }; attrsets = lib.attrsets; From 2cea4a859feff30055115db5669b4c32b61e8398 Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 16 Oct 2023 08:54:41 +0000 Subject: [PATCH 10/19] no timeout on interactive session, pretty print json --- tools/rabbitmq-consumer/rabbitmq-consumer.cabal | 1 + .../src/RabbitMQConsumer/Lib.hs | 16 +++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/tools/rabbitmq-consumer/rabbitmq-consumer.cabal b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal index c3cdf1de03a..4fa31852b18 100644 --- a/tools/rabbitmq-consumer/rabbitmq-consumer.cabal +++ b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal @@ -27,6 +27,7 @@ library build-depends: , aeson + , aeson-pretty , amqp , base , bytestring diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index f96e3daa432..90076031d82 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -18,12 +18,14 @@ module RabbitMQConsumer.Lib where -import Data.Aeson (FromJSON, decode) +import Data.Aeson (FromJSON, decode, Value) import Data.ByteString.Lazy.Char8 qualified as BL import Imports import Network.AMQP import Network.Socket import Options.Applicative +import Data.Aeson.Encode.Pretty + main :: IO () main = do @@ -32,11 +34,14 @@ main = do chan <- openChannel conn qos chan 0 1 False done <- newEmptyMVar - runTimerAsync done opts.timeoutSec case opts.cmd of - Head -> void $ consumeMsgs chan opts.queue Ack (printHead done opts) Interactive -> void $ consumeMsgs chan opts.queue Ack (interactive done opts) - DropHead dhOpts -> void $ consumeMsgs chan opts.queue Ack (dropHead done opts dhOpts) + Head -> do + runTimerAsync done opts.timeoutSec + void $ consumeMsgs chan opts.queue Ack (printHead done opts) + DropHead dhOpts -> do + runTimerAsync done opts.timeoutSec + void $ consumeMsgs chan opts.queue Ack (dropHead done opts dhOpts) takeMVar done closeConnection conn putStrLn "connection closed" @@ -81,9 +86,10 @@ main = do [ "vhost: " <> cs opts.vhost, "queue: " <> cs opts.queue, "timestamp: " <> show msg.msgTimestamp, - "received message: " <> BL.unpack msg.msgBody + "received message: " <> BL.unpack (maybe msg.msgBody encodePretty (decode @Value msg.msgBody)) ] + runTimerAsync :: MVar () -> Int -> IO () runTimerAsync done sec = void $ forkIO $ do threadDelay (sec * 1000000) From dc9cdd77e065ff29467f090bf052a3b8edb0bf0c Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 16 Oct 2023 09:09:59 +0000 Subject: [PATCH 11/19] formatting --- tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index 90076031d82..991b580f899 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -18,14 +18,13 @@ module RabbitMQConsumer.Lib where -import Data.Aeson (FromJSON, decode, Value) +import Data.Aeson (FromJSON, Value, decode) +import Data.Aeson.Encode.Pretty import Data.ByteString.Lazy.Char8 qualified as BL import Imports import Network.AMQP import Network.Socket import Options.Applicative -import Data.Aeson.Encode.Pretty - main :: IO () main = do @@ -89,7 +88,6 @@ main = do "received message: " <> BL.unpack (maybe msg.msgBody encodePretty (decode @Value msg.msgBody)) ] - runTimerAsync :: MVar () -> Int -> IO () runTimerAsync done sec = void $ forkIO $ do threadDelay (sec * 1000000) From 766d7fcf1463a29cb06915f63e44ee11ec5387d7 Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 16 Oct 2023 09:19:48 +0000 Subject: [PATCH 12/19] fixed cabal project file --- cabal.project | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cabal.project b/cabal.project index 66251a6d5a4..2ee9da138e7 100644 --- a/cabal.project +++ b/cabal.project @@ -121,6 +121,8 @@ package proxy ghc-options: -Werror package mlsstats ghc-options: -Werror +package rabbitmq-consumer + ghc-options: -Werror package repair-handles ghc-options: -Werror package rex From e6d8a4885fba88f280d7ce6d4df16e56aabafa9a Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 16 Oct 2023 09:27:50 +0000 Subject: [PATCH 13/19] add nix package --- tools/rabbitmq-consumer/default.nix | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/rabbitmq-consumer/default.nix b/tools/rabbitmq-consumer/default.nix index 141e7716722..4efddeab046 100644 --- a/tools/rabbitmq-consumer/default.nix +++ b/tools/rabbitmq-consumer/default.nix @@ -4,6 +4,7 @@ # dependencies are added or removed. { mkDerivation , aeson +, aeson-pretty , amqp , base , bytestring @@ -21,6 +22,7 @@ mkDerivation { isExecutable = true; libraryHaskellDepends = [ aeson + aeson-pretty amqp base bytestring From c538e960e8f3f33855bf40fe49c2610722d40333 Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 16 Oct 2023 09:31:38 +0000 Subject: [PATCH 14/19] added new line --- tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index 991b580f899..c3c39fe737e 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -85,7 +85,7 @@ main = do [ "vhost: " <> cs opts.vhost, "queue: " <> cs opts.queue, "timestamp: " <> show msg.msgTimestamp, - "received message: " <> BL.unpack (maybe msg.msgBody encodePretty (decode @Value msg.msgBody)) + "received message: \n" <> BL.unpack (maybe msg.msgBody encodePretty (decode @Value msg.msgBody)) ] runTimerAsync :: MVar () -> Int -> IO () From 1ea0da9d00343e4a8a348738018e64491acc8d62 Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 16 Oct 2023 10:59:42 +0000 Subject: [PATCH 15/19] pretty print --- .../rabbitmq-consumer/rabbitmq-consumer.cabal | 4 ++ .../src/RabbitMQConsumer/Lib.hs | 42 ++++++++++++++----- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/tools/rabbitmq-consumer/rabbitmq-consumer.cabal b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal index 4fa31852b18..81eb049de12 100644 --- a/tools/rabbitmq-consumer/rabbitmq-consumer.cabal +++ b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal @@ -34,6 +34,10 @@ library , imports , network , optparse-applicative + , text + , types-common + , wire-api + , wire-api-federation default-extensions: NoImplicitPrelude diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index c3c39fe737e..3712e08ab70 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -18,13 +18,17 @@ module RabbitMQConsumer.Lib where -import Data.Aeson (FromJSON, Value, decode) +import Data.Aeson import Data.Aeson.Encode.Pretty import Data.ByteString.Lazy.Char8 qualified as BL +import Data.Domain (domainText) +import Data.Text.Lazy.Encoding qualified as TL import Imports import Network.AMQP import Network.Socket import Options.Applicative +import Wire.API.Federation.BackendNotifications (BackendNotification (..)) +import Wire.API.RawJson main :: IO () main = do @@ -55,10 +59,10 @@ main = do dropHead :: MVar () -> Opts -> DropHeadOpts -> (Message, Envelope) -> IO () dropHead done opts dhOpts (msg, env) = do putStrLn $ displayMessage opts msg - case decode @MessageJSON msg.msgBody of + case decode @BackendNotification msg.msgBody of Nothing -> putStrLn "failed to decode message body" - Just json -> do - if json.path == dhOpts.path + Just bn -> do + if bn.path == dhOpts.path then do putStrLn "dropping message" nackEnv env @@ -85,7 +89,7 @@ main = do [ "vhost: " <> cs opts.vhost, "queue: " <> cs opts.queue, "timestamp: " <> show msg.msgTimestamp, - "received message: \n" <> BL.unpack (maybe msg.msgBody encodePretty (decode @Value msg.msgBody)) + "received message: \n" <> maybe (BL.unpack msg.msgBody) displayBackendNotification (decode @BackendNotification msg.msgBody) ] runTimerAsync :: MVar () -> Int -> IO () @@ -106,7 +110,7 @@ data Opts = Opts } data DropHeadOpts = DropHeadOpts - { path :: String + { path :: Text } data Command = Head | DropHead DropHeadOpts | Interactive @@ -200,8 +204,26 @@ interactiveCommand :: Mod CommandFields Command interactiveCommand = (command "interactive" (info (pure Interactive) (progDesc "Interactively drop the first message from the queue"))) -data MessageJSON = MessageJSON - {path :: String} - deriving (Generic, Show) +data PrettyMessage = PrettyMessage + { ownDomain :: Text, + targetComponent :: Text, + path :: Text, + body :: Value + } + deriving (Show, Eq, Generic) + +instance ToJSON PrettyMessage -instance FromJSON MessageJSON +instance FromJSON PrettyMessage + +displayBackendNotification :: BackendNotification -> String +displayBackendNotification = BL.unpack . encodePretty . toPrettyMessage + where + toPrettyMessage :: BackendNotification -> PrettyMessage + toPrettyMessage BackendNotification {..} = + PrettyMessage + { ownDomain = domainText ownDomain, + targetComponent = cs $ show targetComponent, + path = path, + body = fromMaybe (String $ cs $ TL.decodeUtf8 body.rawJsonBytes) $ decode @Value body.rawJsonBytes + } From 772bcf0448a5cc2cb30c31a217483e413a99b32e Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 16 Oct 2023 11:03:34 +0000 Subject: [PATCH 16/19] wip --- tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index 3712e08ab70..7d3263ed976 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -21,7 +21,6 @@ module RabbitMQConsumer.Lib where import Data.Aeson import Data.Aeson.Encode.Pretty import Data.ByteString.Lazy.Char8 qualified as BL -import Data.Domain (domainText) import Data.Text.Lazy.Encoding qualified as TL import Imports import Network.AMQP @@ -205,9 +204,9 @@ interactiveCommand = (command "interactive" (info (pure Interactive) (progDesc "Interactively drop the first message from the queue"))) data PrettyMessage = PrettyMessage - { ownDomain :: Text, - targetComponent :: Text, - path :: Text, + { ownDomain :: Value, + targetComponent :: Value, + path :: Value, body :: Value } deriving (Show, Eq, Generic) @@ -222,8 +221,8 @@ displayBackendNotification = BL.unpack . encodePretty . toPrettyMessage toPrettyMessage :: BackendNotification -> PrettyMessage toPrettyMessage BackendNotification {..} = PrettyMessage - { ownDomain = domainText ownDomain, - targetComponent = cs $ show targetComponent, - path = path, + { ownDomain = toJSON ownDomain, + targetComponent = toJSON targetComponent, + path = toJSON path, body = fromMaybe (String $ cs $ TL.decodeUtf8 body.rawJsonBytes) $ decode @Value body.rawJsonBytes } From 2467027e1daaa7b9d6f6ca8c25f14793588ace9b Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 16 Oct 2023 12:10:06 +0000 Subject: [PATCH 17/19] wip --- .../src/RabbitMQConsumer/Lib.hs | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index 7d3263ed976..f131961e1ce 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -27,7 +27,6 @@ import Network.AMQP import Network.Socket import Options.Applicative import Wire.API.Federation.BackendNotifications (BackendNotification (..)) -import Wire.API.RawJson main :: IO () main = do @@ -88,7 +87,7 @@ main = do [ "vhost: " <> cs opts.vhost, "queue: " <> cs opts.queue, "timestamp: " <> show msg.msgTimestamp, - "received message: \n" <> maybe (BL.unpack msg.msgBody) displayBackendNotification (decode @BackendNotification msg.msgBody) + "received message: \n" <> BL.unpack (maybe msg.msgBody encodePretty (decode @BackendNotification' msg.msgBody)) ] runTimerAsync :: MVar () -> Int -> IO () @@ -203,7 +202,8 @@ interactiveCommand :: Mod CommandFields Command interactiveCommand = (command "interactive" (info (pure Interactive) (progDesc "Interactively drop the first message from the queue"))) -data PrettyMessage = PrettyMessage +-- | A variant of 'BackendNotification' that allows encodePretty to work with the body field +data BackendNotification' = BackendNotification' { ownDomain :: Value, targetComponent :: Value, path :: Value, @@ -211,18 +211,15 @@ data PrettyMessage = PrettyMessage } deriving (Show, Eq, Generic) -instance ToJSON PrettyMessage - -instance FromJSON PrettyMessage - -displayBackendNotification :: BackendNotification -> String -displayBackendNotification = BL.unpack . encodePretty . toPrettyMessage - where - toPrettyMessage :: BackendNotification -> PrettyMessage - toPrettyMessage BackendNotification {..} = - PrettyMessage - { ownDomain = toJSON ownDomain, - targetComponent = toJSON targetComponent, - path = toJSON path, - body = fromMaybe (String $ cs $ TL.decodeUtf8 body.rawJsonBytes) $ decode @Value body.rawJsonBytes - } +instance ToJSON BackendNotification' + +instance FromJSON BackendNotification' where + parseJSON = withObject "BackendNotification" $ \o -> + BackendNotification' + <$> o .: "ownDomain" + <*> o .: "targetComponent" + <*> o .: "path" + <*> (bodyToValue . TL.encodeUtf8 <$> o .: "body") + where + bodyToValue :: BL.ByteString -> Value + bodyToValue bs = fromMaybe (String $ cs $ TL.decodeUtf8 bs) $ decode @Value bs From e08fd4eb8d62e7a5bf1452c442c0605ba68666c7 Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 16 Oct 2023 13:04:06 +0000 Subject: [PATCH 18/19] cleaner way --- .../src/RabbitMQConsumer/Lib.hs | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs index f131961e1ce..307d1d30039 100644 --- a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -21,12 +21,14 @@ module RabbitMQConsumer.Lib where import Data.Aeson import Data.Aeson.Encode.Pretty import Data.ByteString.Lazy.Char8 qualified as BL +import Data.Domain (Domain) import Data.Text.Lazy.Encoding qualified as TL import Imports import Network.AMQP import Network.Socket import Options.Applicative import Wire.API.Federation.BackendNotifications (BackendNotification (..)) +import Wire.API.MakesFederatedCall (Component) main :: IO () main = do @@ -202,24 +204,29 @@ interactiveCommand :: Mod CommandFields Command interactiveCommand = (command "interactive" (info (pure Interactive) (progDesc "Interactively drop the first message from the queue"))) --- | A variant of 'BackendNotification' that allows encodePretty to work with the body field -data BackendNotification' = BackendNotification' - { ownDomain :: Value, - targetComponent :: Value, - path :: Value, - body :: Value - } +newtype Body = Body {unBody :: Value} deriving (Show, Eq, Generic) -instance ToJSON BackendNotification' +instance ToJSON Body where + toJSON (Body v) = v -instance FromJSON BackendNotification' where - parseJSON = withObject "BackendNotification" $ \o -> - BackendNotification' - <$> o .: "ownDomain" - <*> o .: "targetComponent" - <*> o .: "path" - <*> (bodyToValue . TL.encodeUtf8 <$> o .: "body") +instance FromJSON Body where + parseJSON v = + Body . bodyToValue . TL.encodeUtf8 <$> parseJSON v where bodyToValue :: BL.ByteString -> Value bodyToValue bs = fromMaybe (String $ cs $ TL.decodeUtf8 bs) $ decode @Value bs + +-- | A variant of 'BackendNotification' with a FromJSON instance for the body field +-- that converts its BL.ByteString content to a JSON value so that it can be pretty printed +data BackendNotification' = BackendNotification' + { ownDomain :: Domain, + targetComponent :: Component, + path :: Text, + body :: Body + } + deriving (Show, Eq, Generic) + +instance ToJSON BackendNotification' + +instance FromJSON BackendNotification' From ad2bd1b7342d769587b4decd9fbb00fbca031edb Mon Sep 17 00:00:00 2001 From: Leif Battermann Date: Mon, 16 Oct 2023 13:40:47 +0000 Subject: [PATCH 19/19] nix packages --- tools/rabbitmq-consumer/default.nix | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tools/rabbitmq-consumer/default.nix b/tools/rabbitmq-consumer/default.nix index 4efddeab046..1da708042e2 100644 --- a/tools/rabbitmq-consumer/default.nix +++ b/tools/rabbitmq-consumer/default.nix @@ -13,6 +13,10 @@ , lib , network , optparse-applicative +, text +, types-common +, wire-api +, wire-api-federation }: mkDerivation { pname = "rabbitmq-consumer"; @@ -29,6 +33,10 @@ mkDerivation { imports network optparse-applicative + text + types-common + wire-api + wire-api-federation ]; executableHaskellDepends = [ base ]; description = "CLI tool to consume messages from a RabbitMQ queue";