diff --git a/cabal.project b/cabal.project index e4fe242a3b4..2ee9da138e7 100644 --- a/cabal.project +++ b/cabal.project @@ -1,6 +1,6 @@ repository hackage.haskell.org url: https://hackage.haskell.org/ -index-state: 2023-10-03T15:17:00Z +index-state: 2023-10-03T15:17:00Z packages: integration , libs/bilge/ @@ -51,6 +51,7 @@ packages: , tools/db/repair-brig-clients-table/ , tools/db/service-backfill/ , tools/fedcalls/ + , tools/rabbitmq-consumer , tools/rex/ , tools/stern/ , tools/mlsstats/ @@ -120,6 +121,8 @@ package proxy ghc-options: -Werror package mlsstats ghc-options: -Werror +package rabbitmq-consumer + ghc-options: -Werror package repair-handles ghc-options: -Werror package rex diff --git a/changelog.d/5-internal/WPB-4748 b/changelog.d/5-internal/WPB-4748 new file mode 100644 index 00000000000..59a431e2b99 --- /dev/null +++ b/changelog.d/5-internal/WPB-4748 @@ -0,0 +1 @@ +CLI tool to consume messages from a RabbitMQ queue diff --git a/nix/local-haskell-packages.nix b/nix/local-haskell-packages.nix index 59d940f7cf2..39fe6a23e24 100644 --- a/nix/local-haskell-packages.nix +++ b/nix/local-haskell-packages.nix @@ -53,6 +53,7 @@ service-backfill = hself.callPackage ../tools/db/service-backfill/default.nix { inherit gitignoreSource; }; fedcalls = hself.callPackage ../tools/fedcalls/default.nix { inherit gitignoreSource; }; mlsstats = hself.callPackage ../tools/mlsstats/default.nix { inherit gitignoreSource; }; + rabbitmq-consumer = hself.callPackage ../tools/rabbitmq-consumer/default.nix { inherit gitignoreSource; }; rex = hself.callPackage ../tools/rex/default.nix { inherit gitignoreSource; }; stern = hself.callPackage ../tools/stern/default.nix { inherit gitignoreSource; }; } diff --git a/nix/wire-server.nix b/nix/wire-server.nix index 12aafac3ddd..51220ccf979 100644 --- a/nix/wire-server.nix +++ b/nix/wire-server.nix @@ -84,6 +84,7 @@ let zauth = [ "zauth" ]; background-worker = [ "background-worker" ]; integration = [ "integration" ]; + rabbitmq-consumer = [ "rabbitmq-consumer" ]; }; attrsets = lib.attrsets; diff --git a/tools/rabbitmq-consumer/README.md b/tools/rabbitmq-consumer/README.md new file mode 100644 index 00000000000..387285946ec --- /dev/null +++ b/tools/rabbitmq-consumer/README.md @@ -0,0 +1,30 @@ +# RabbitMQ Consumer + +```txt +rabbitmq-consumer + +Usage: rabbitmq-consumer [-s|--host HOST] [-p|--port PORT] + [-u|--username USERNAME] [-w|--password PASSWORD] + [-v|--vhost VHOST] [-q|--queue QUEUE] + [-t|--timeout TIMEOUT] COMMAND + + CLI tool to consume messages from a RabbitMQ queue + +Available options: + -h,--help Show this help text + -s,--host HOST RabbitMQ host (default: "localhost") + -p,--port PORT RabbitMQ Port (default: 5672) + -u,--username USERNAME RabbitMQ Username (default: "guest") + -w,--password PASSWORD RabbitMQ Password (default: "alpaca-grapefruit") + -v,--vhost VHOST RabbitMQ VHost (default: "/") + -q,--queue QUEUE RabbitMQ Queue (default: "test") + -t,--timeout TIMEOUT Timeout in seconds. The command will timeout if no + messages are received within this time. This can + happen when the queue is empty, or when we lose the + single active consumer race. (default: 10) + +Available commands: + head Print the first message in the queue + drop-head Drop the first message in the queue + interactive Interactively drop the first message from the queue +``` diff --git a/tools/rabbitmq-consumer/app/Main.hs b/tools/rabbitmq-consumer/app/Main.hs new file mode 100644 index 00000000000..0379be52e30 --- /dev/null +++ b/tools/rabbitmq-consumer/app/Main.hs @@ -0,0 +1,23 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2023 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . + +module Main where + +import qualified RabbitMQConsumer.Lib as Lib + +main :: IO () +main = Lib.main diff --git a/tools/rabbitmq-consumer/default.nix b/tools/rabbitmq-consumer/default.nix new file mode 100644 index 00000000000..1da708042e2 --- /dev/null +++ b/tools/rabbitmq-consumer/default.nix @@ -0,0 +1,45 @@ +# WARNING: GENERATED FILE, DO NOT EDIT. +# This file is generated by running hack/bin/generate-local-nix-packages.sh and +# must be regenerated whenever local packages are added or removed, or +# dependencies are added or removed. +{ mkDerivation +, aeson +, aeson-pretty +, amqp +, base +, bytestring +, gitignoreSource +, imports +, lib +, network +, optparse-applicative +, text +, types-common +, wire-api +, wire-api-federation +}: +mkDerivation { + pname = "rabbitmq-consumer"; + version = "1.0.0"; + src = gitignoreSource ./.; + isLibrary = true; + isExecutable = true; + libraryHaskellDepends = [ + aeson + aeson-pretty + amqp + base + bytestring + imports + network + optparse-applicative + text + types-common + wire-api + wire-api-federation + ]; + executableHaskellDepends = [ base ]; + description = "CLI tool to consume messages from a RabbitMQ queue"; + license = lib.licenses.agpl3Only; + mainProgram = "rabbitmq-consumer"; +} diff --git a/tools/rabbitmq-consumer/rabbitmq-consumer.cabal b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal new file mode 100644 index 00000000000..81eb049de12 --- /dev/null +++ b/tools/rabbitmq-consumer/rabbitmq-consumer.cabal @@ -0,0 +1,84 @@ +cabal-version: 3.0 +name: rabbitmq-consumer +version: 1.0.0 +synopsis: CLI tool to consume messages from a RabbitMQ queue +category: Network +author: Wire Swiss GmbH +maintainer: Wire Swiss GmbH +copyright: (c) 2023 Wire Swiss GmbH +license: AGPL-3.0-only +build-type: Simple + +executable rabbitmq-consumer + main-is: Main.hs + build-depends: + , base + , rabbitmq-consumer + + hs-source-dirs: app + +library + hs-source-dirs: src + exposed-modules: RabbitMQConsumer.Lib + default-language: GHC2021 + ghc-options: + -Wall -Wpartial-fields -fwarn-tabs + -optP-Wno-nonportable-include-path + + build-depends: + , aeson + , aeson-pretty + , amqp + , base + , bytestring + , imports + , network + , optparse-applicative + , text + , types-common + , wire-api + , wire-api-federation + + default-extensions: + NoImplicitPrelude + AllowAmbiguousTypes + BangPatterns + ConstraintKinds + DataKinds + DefaultSignatures + DeriveFunctor + DeriveGeneric + DeriveLift + DeriveTraversable + DerivingStrategies + DerivingVia + DuplicateRecordFields + EmptyCase + FlexibleContexts + FlexibleInstances + FunctionalDependencies + GADTs + GeneralizedNewtypeDeriving + InstanceSigs + KindSignatures + LambdaCase + MultiParamTypeClasses + MultiWayIf + NamedFieldPuns + OverloadedLabels + OverloadedRecordDot + PackageImports + PatternSynonyms + PolyKinds + QuasiQuotes + RankNTypes + RecordWildCards + ScopedTypeVariables + StandaloneDeriving + TupleSections + TypeApplications + TypeFamilies + TypeFamilyDependencies + TypeOperators + UndecidableInstances + ViewPatterns diff --git a/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs new file mode 100644 index 00000000000..307d1d30039 --- /dev/null +++ b/tools/rabbitmq-consumer/src/RabbitMQConsumer/Lib.hs @@ -0,0 +1,232 @@ +-- This file is part of the Wire Server implementation. +-- +-- Copyright (C) 2023 Wire Swiss GmbH +-- +-- This program is free software: you can redistribute it and/or modify it under +-- the terms of the GNU Affero General Public License as published by the Free +-- Software Foundation, either version 3 of the License, or (at your option) any +-- later version. +-- +-- This program is distributed in the hope that it will be useful, but WITHOUT +-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +-- details. +-- +-- You should have received a copy of the GNU Affero General Public License along +-- with this program. If not, see . +{-# LANGUAGE OverloadedStrings #-} + +module RabbitMQConsumer.Lib where + +import Data.Aeson +import Data.Aeson.Encode.Pretty +import Data.ByteString.Lazy.Char8 qualified as BL +import Data.Domain (Domain) +import Data.Text.Lazy.Encoding qualified as TL +import Imports +import Network.AMQP +import Network.Socket +import Options.Applicative +import Wire.API.Federation.BackendNotifications (BackendNotification (..)) +import Wire.API.MakesFederatedCall (Component) + +main :: IO () +main = do + opts <- execParser (info (helper <*> optsParser) desc) + conn <- openConnection' opts.host opts.port opts.vhost opts.username opts.password + chan <- openChannel conn + qos chan 0 1 False + done <- newEmptyMVar + case opts.cmd of + Interactive -> void $ consumeMsgs chan opts.queue Ack (interactive done opts) + Head -> do + runTimerAsync done opts.timeoutSec + void $ consumeMsgs chan opts.queue Ack (printHead done opts) + DropHead dhOpts -> do + runTimerAsync done opts.timeoutSec + void $ consumeMsgs chan opts.queue Ack (dropHead done opts dhOpts) + takeMVar done + closeConnection conn + putStrLn "connection closed" + where + desc = header "rabbitmq-consumer" <> progDesc "CLI tool to consume messages from a RabbitMQ queue" <> fullDesc + + printHead :: MVar () -> Opts -> (Message, Envelope) -> IO () + printHead done opts (msg, _env) = do + putStrLn $ displayMessage opts msg + void $ tryPutMVar done () + + dropHead :: MVar () -> Opts -> DropHeadOpts -> (Message, Envelope) -> IO () + dropHead done opts dhOpts (msg, env) = do + putStrLn $ displayMessage opts msg + case decode @BackendNotification msg.msgBody of + Nothing -> putStrLn "failed to decode message body" + Just bn -> do + if bn.path == dhOpts.path + then do + putStrLn "dropping message" + nackEnv env + else do + putStrLn "path does not match. keeping message" + void $ tryPutMVar done () + + interactive :: MVar () -> Opts -> (Message, Envelope) -> IO () + interactive done opts (msg, env) = do + putStrLn $ displayMessage opts msg + putStrLn $ "type 'drop' to drop the message and terminate, or press enter to terminate without dropping the message" + input <- getLine + if input == "drop" + then do + ackEnv env + putStrLn "message dropped" + else putStrLn "message not dropped" + void $ tryPutMVar done () + + displayMessage :: Opts -> Message -> String + displayMessage opts msg = + intercalate + "\n" + [ "vhost: " <> cs opts.vhost, + "queue: " <> cs opts.queue, + "timestamp: " <> show msg.msgTimestamp, + "received message: \n" <> BL.unpack (maybe msg.msgBody encodePretty (decode @BackendNotification' msg.msgBody)) + ] + + runTimerAsync :: MVar () -> Int -> IO () + runTimerAsync done sec = void $ forkIO $ do + threadDelay (sec * 1000000) + putStrLn $ "timeout after " <> show sec <> " seconds" + void $ tryPutMVar done () + +data Opts = Opts + { host :: String, + port :: PortNumber, + username :: Text, + password :: Text, + vhost :: Text, + queue :: Text, + timeoutSec :: Int, + cmd :: Command + } + +data DropHeadOpts = DropHeadOpts + { path :: Text + } + +data Command = Head | DropHead DropHeadOpts | Interactive + +optsParser :: Parser Opts +optsParser = + Opts + <$> strOption + ( long "host" + <> short 's' + <> metavar "HOST" + <> help "RabbitMQ host" + <> value "localhost" + <> showDefault + ) + <*> option + auto + ( long "port" + <> short 'p' + <> metavar "PORT" + <> help "RabbitMQ Port" + <> value 5672 + <> showDefault + ) + <*> strOption + ( long "username" + <> short 'u' + <> metavar "USERNAME" + <> help "RabbitMQ Username" + <> value "guest" + <> showDefault + ) + <*> strOption + ( long "password" + <> short 'w' + <> metavar "PASSWORD" + <> help "RabbitMQ Password" + <> value "alpaca-grapefruit" + <> showDefault + ) + <*> strOption + ( long "vhost" + <> short 'v' + <> metavar "VHOST" + <> help "RabbitMQ VHost" + <> value "/" + <> showDefault + ) + <*> strOption + ( long "queue" + <> short 'q' + <> metavar "QUEUE" + <> help "RabbitMQ Queue" + <> value "test" + <> showDefault + ) + <*> option + auto + ( long "timeout" + <> short 't' + <> metavar "TIMEOUT" + <> help + "Timeout in seconds. The command will timeout if no messages are received within this time. \ + \This can happen when the queue is empty, \ + \or when we lose the single active consumer race." + <> value 10 + <> showDefault + ) + <*> hsubparser (headCommand <> dropHeadCommand <> interactiveCommand) + +headCommand :: Mod CommandFields Command +headCommand = + (command "head" (info (pure Head) (progDesc "Print the first message in the queue"))) + +dropHeadCommand :: Mod CommandFields Command +dropHeadCommand = + (command "drop-head" (info p (progDesc "Drop the first message in the queue"))) + where + p :: Parser Command + p = + DropHead + . DropHeadOpts + <$> strOption + ( long "path" + <> short 'a' + <> metavar "PATH" + <> help "only drop the first message if the path matches" + ) + +interactiveCommand :: Mod CommandFields Command +interactiveCommand = + (command "interactive" (info (pure Interactive) (progDesc "Interactively drop the first message from the queue"))) + +newtype Body = Body {unBody :: Value} + deriving (Show, Eq, Generic) + +instance ToJSON Body where + toJSON (Body v) = v + +instance FromJSON Body where + parseJSON v = + Body . bodyToValue . TL.encodeUtf8 <$> parseJSON v + where + bodyToValue :: BL.ByteString -> Value + bodyToValue bs = fromMaybe (String $ cs $ TL.decodeUtf8 bs) $ decode @Value bs + +-- | A variant of 'BackendNotification' with a FromJSON instance for the body field +-- that converts its BL.ByteString content to a JSON value so that it can be pretty printed +data BackendNotification' = BackendNotification' + { ownDomain :: Domain, + targetComponent :: Component, + path :: Text, + body :: Body + } + deriving (Show, Eq, Generic) + +instance ToJSON BackendNotification' + +instance FromJSON BackendNotification'