Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
hassansin committed May 24, 2017
0 parents commit 70a3fb8
Show file tree
Hide file tree
Showing 220 changed files with 50,073 additions and 0 deletions.
114 changes: 114 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
## amqptools

A brief description of your application

### Synopsis


A longer description that spans multiple lines and likely contains
examples and usage of using your application. For example:

### Options

```
-h, --help help for amqptools
```

### SEE ALSO
* [amqptools consume](#amqptools-consume) - Consumes messages
* [amqptools publish](#amqptools-publish) - Publishes a message

## amqptools consume

Consumes messages

### Synopsis


Consume messages
Uses the default exchange '', When no exchange is provided
Use comma-separated values for binding the same queue with multiple routing keys:
amqptools consume --exchange logs --keys info,warning,debug



```
amqptools consume [flags]
```

### Examples

```
ampqtool consume -H ampq.example.com -P 5672 --exchange amq.direct --durable-queue
```

### Options

```
-H, --host string specify host (default "localhost")
-P, --port int specify port (default 5672)
-v, --vhost string specify vhost (default "/")
-u, --username string specify username (default "guest")
-p, --password string specify password (default "guest")
-e, --exchange string exchange name (default "")
-k, --key string routing key (default "")
-t, --type string exchange type (default "direct")
--durable durable exchange
-q, --queue string specify queue (default auto-generated)
-n, --number int retrieve maximum n messages. 0 = forever (default 0)
--passive passive queue
--exclusive exclusive queue
--durable-queue durable queue
--no-ack don't send ack
-h, --help help for consume
```

### SEE ALSO
* [amqptools](amqptools.md) - A brief description of your application

## amqptools publish

Publishes a message

### Synopsis


Publish a message using exchange and routing key.
mesage can be string or stdin:
echo 'hello world' | amqptools publish --exchange=logs --key=info



```
amqptools publish [flags] [message]
```

### Examples

```
ampqtools publish -H ampq.example.com -P 5672 --exchange=amq.direct --key=hello "hello world"
amqptools publish "hello world" --properties="content-type:text/html" --properties="expiration:3000"
```

### Options

```
-H, --host string specify host (default "localhost")
-P, --port int specify port (default 5672)
-v, --vhost string specify vhost (default "/")
-u, --username string specify username (default "guest")
-p, --password string specify password (default "guest")
-e, --exchange string exchange name (default "")
-k, --key string routing key (default "")
-t, --type string exchange type (default "direct")
--durable durable exchange
--properties string message properties, key:value format
--headers string message headers, key:value format
-h, --help help for publish
```

### SEE ALSO
* [amqptools](amqptools.md) - A brief description of your application

114 changes: 114 additions & 0 deletions cmd/consume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package cmd

import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/spf13/cobra"
"github.com/streadway/amqp"
)

// consumeCmd represents the consume command
var consumeCmd = &cobra.Command{
Use: "consume [flags]",
Aliases: []string{"receive"},
Short: "Consumes messages",
Long: `Consume messages
Uses the default exchange '', When no exchange is provided
Use comma-separated values for binding the same queue with multiple routing keys:
amqptools consume --exchange logs --keys info,warning,debug
`,
Example: ` ampqtool consume -H ampq.example.com -P 5672 --exchange amq.direct --durable-queue
`,
PreRunE: func(cmd *cobra.Command, args []string) error {
if len(args) > 0 {
return errors.New("unknown arg: " + args[0])
}
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
cmd.SilenceUsage = true
cmd.SilenceErrors = true

args = append(args, "", "")

// Dial amqp server
uri := "amqp://" + username + ":" + password + "@" + host + ":" + strconv.Itoa(port) + vhost
conn, err := amqp.Dial(uri)
if err != nil {
return fmt.Errorf("connection.open: %v", err)
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
return fmt.Errorf("channel.open: %v", err)
}
closes := ch.NotifyClose(make(chan *amqp.Error, 1))

q, err := ch.QueueDeclare(queue, durableQueue, false, exclusive, false, nil)
if err != nil {
return fmt.Errorf("queue.declare: %v", err)
}
// bind queue for non-default exchange
if exchange != "" {
if err = ch.ExchangeDeclare(exchange, exchangeType, durableExchange, false, false, false, nil); err != nil {
return fmt.Errorf("exchange.declare: %v", err)
}
// bind the queue to all routingkeys
for _, key := range strings.Split(routingkey, ",") {
if err = ch.QueueBind(queue, key, exchange, false, nil); err != nil {
return fmt.Errorf("queue.bind: %v", err)
}
}
}

prefetchCount := 10
if number > 0 {
prefetchCount = number
}
if err = ch.Qos(prefetchCount, 0, false); err != nil {
return fmt.Errorf("basic.qos: %v", err)
}

msgs, err := ch.Consume(queue, "", !noAck, exclusive, false, false, nil)
if err != nil {
return fmt.Errorf("basic.consume: %v", err)
}
count := 0

go func() {
for msg := range msgs {
fmt.Printf("Timestamp: %s\n", msg.Timestamp)
fmt.Printf("Exchange: %s\n", msg.Exchange)
fmt.Printf("RoutingKey: %s\n", msg.RoutingKey)
fmt.Printf("Queue: %s\n", q.Name)
fmt.Printf("Headers: %v\n", msg.Headers)
fmt.Printf("Payload: \n%s\n\n", msg.Body)

count++
if number != 0 && count >= number {
ch.Close()
}
}
}()
fmt.Println("Waiting for messages. To exit press CTRL+C\n")
<-closes
return nil
},
}

func init() {
RootCmd.AddCommand(consumeCmd)
consumeCmd.Flags().AddFlagSet(commonFlagSet())
consumeCmd.Flags().StringVarP(&queue, "queue", "q", "", "specify queue (default auto-generated)")
consumeCmd.Flags().IntVarP(&number, "number", "n", 0, "retrieve maximum n messages. 0 = forever (default 0)")
consumeCmd.Flags().BoolVarP(&passive, "passive", "", false, "passive queue")
consumeCmd.Flags().BoolVarP(&exclusive, "exclusive", "", false, "exclusive queue")
consumeCmd.Flags().BoolVarP(&durableQueue, "durable-queue", "", false, "durable queue")
consumeCmd.Flags().BoolVarP(&noAck, "no-ack", "", false, "don't send ack")
consumeCmd.Flags().SortFlags = false
}
62 changes: 62 additions & 0 deletions cmd/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package cmd

import (
"bytes"
"os"
"path"
"strings"

"github.com/spf13/cobra"
"github.com/spf13/cobra/doc"
)

// consumeCmd represents the consume command
var docCmd = &cobra.Command{
Use: "doc",
Short: "Generate markdown document",
Hidden: true,
RunE: func(cmd *cobra.Command, args []string) error {
cmd.SilenceUsage = true
cmd.SilenceErrors = true

rootDoc := new(bytes.Buffer)
if err := doc.GenMarkdownCustom(RootCmd, rootDoc, func(name string) string {
base := strings.TrimSuffix(name, path.Ext(name))
return "#" + strings.Replace(base, "_", "-", 5)
}); err != nil {
return err
}

consumeDoc := new(bytes.Buffer)
if err := doc.GenMarkdown(consumeCmd, consumeDoc); err != nil {
return err
}

produceDoc := new(bytes.Buffer)
if err := doc.GenMarkdown(produceCmd, produceDoc); err != nil {
return err
}

f, err := os.Create("README.md")
if err != nil {
return err
}
defer f.Close()
if _, err = f.Write(rootDoc.Bytes()); err != nil {
return err
}
if _, err = f.Write(consumeDoc.Bytes()); err != nil {
return err
}
if _, err = f.Write(produceDoc.Bytes()); err != nil {
return err
}
f.Sync()

return nil
},
}

func init() {
RootCmd.AddCommand(docCmd)
}
106 changes: 106 additions & 0 deletions cmd/produce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package cmd

import (
"fmt"
"io/ioutil"
"os"
"reflect"
"strconv"
"time"

"github.com/spf13/cobra"
"github.com/streadway/amqp"
)

// produceCmd represents the produce command
var produceCmd = &cobra.Command{
Use: "publish [flags] [message]",
Aliases: []string{"produce", "send"},
Short: "Publishes a message",
Long: `Publish a message using exchange and routing key.
mesage can be string or stdin:
echo 'hello world' | amqptools publish --exchange=logs --key=info
`,
Example: ` ampqtools publish -H ampq.example.com -P 5672 --exchange=amq.direct --key=hello "hello world"
amqptools publish "hello world" --properties="content-type:text/html" --properties="expiration:3000"
`,
PreRunE: func(cmd *cobra.Command, args []string) error {
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
cmd.SilenceUsage = true
cmd.SilenceErrors = true

var message string
if len(args) > 0 {
message = args[0]
} else {
bytes, err := ioutil.ReadAll(os.Stdin)
if err != nil {
return fmt.Errorf("read file: %v", err)
}
message = string(bytes)
}

uri := "amqp://" + username + ":" + password + "@" + host + ":" + strconv.Itoa(port) + vhost
conn, err := amqp.Dial(uri)
if err != nil {
return fmt.Errorf("connection.open: %v", err)
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
return fmt.Errorf("channel.open: %v", err)
}
if exchange != "" {
if err = ch.ExchangeDeclare(exchange, exchangeType, durableExchange, false, false, false, nil); err != nil {
return fmt.Errorf("exchange.declare: %v", err)
}
}

msg := amqp.Publishing{
Headers: headers.Table,
Timestamp: time.Now(),
Body: []byte(message),
}

for key, val := range properties.Table {
v := reflect.ValueOf(&msg).Elem().FieldByName(key)
if v.IsValid() {
v.SetString(val.(string))
}
}

if err := ch.Confirm(false); err != nil {
return fmt.Errorf("confirm.select destination: %s", err)
}

returns := ch.NotifyReturn(make(chan amqp.Return, 1))
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))

if err = ch.Publish(exchange, routingkey, true, false, msg); err != nil {
return fmt.Errorf("basic.publish: %v", err)
}
select {
case r := <-returns:
fmt.Println("Message not delivered: " + r.ReplyText)
case c := <-confirms:
if c.Ack {
fmt.Println("Message published")
} else {
fmt.Println("Message failed to publish")
}
}
return nil
},
}

func init() {
RootCmd.AddCommand(produceCmd)
produceCmd.Flags().AddFlagSet(commonFlagSet())
produceCmd.Flags().VarP(&properties, "properties", "", "message properties, key:value format")
produceCmd.Flags().VarP(&headers, "headers", "", "message headers, key:value format")
produceCmd.Flags().SortFlags = false
}
Loading

0 comments on commit 70a3fb8

Please sign in to comment.