Add RabbitMQ and gocron to your DigitalOcean droplet [Part 3]

Building a simple message consumer with gocron

Alican Sungur
CodeX

--

(This is the last part of a 3 part series. You can read the previous part here)

Photo by Arwin Neil Baichoo on Unsplash

Summary of the previous chapter

In the previous chapter, we’ve built our message producer function enqueue. The enqueue function is invoked when our application server receives traffic and publishes a message to RabbitMQ. Depending on your configuration you can call enqueue() function call elsewhere.

Building the consumer

Next, we will build our cron-job to run every hour/minute(depending on your preference) and dequeue messages in our queue. To keep things simple, when a message is consumed we will send a basic notification email using Mailgun’s go package.

(You can find the repo of the simple consumer from here)

In this example, the worker will run in the background and every 2 minutes it will execute processMessages function. We will talk about setting up this worker to run as a systemdservice later.

Similar to the previous example where we create a Connection and a Channel to be able to execute tasks on our queue.

amqpServerURL := os.Getenv("AMQP_SERVER_URL")
connectRabbitMQ, err := amqp.Dial(amqpServerURL)
if err != nil {
panic(err)
}
defer connectRabbitMQ.Close()
channelRabbitMQ, err := connectRabbitMQ.Channel()
if err != nil {
panic(err)
}
defer channelRabbitMQ.Close()

We will be using Consume() function on our channel to retrieve the messages. You can see the comments for the options. To keep things simple we will automatically acknowledge the messages as we consume them.

messages, err := channelRabbitMQ.Consume(
"FallbackAPIQueue",
"",
true, // auto acknowledge
false, // exclusive
false, // no local
false, // no wait
nil,
)
if err != nil {
log.Println(err)
}
log.Println("You've connected to RabbitMQ")
log.Println("Waiting for messages")

if len(messages) == 0 {
log.Println("You don't have any messages in the queue")
}
for message := range messages {
if string(message.Body) == "A request has been sent via fallback API" {
sendSimpleMessage()
}
}

Lastly, for each message we consume, we will call sendSimpleMessage()

Using Mailgun’s go client library mailgun-go we can send simple emails. This will require you to have an account with Mailgun. You will need your Mailgun domain and your API key saved in your environment variables. There is a link in the resources section below if you don’t have a Mailgun account.

func sendSimpleMessage() {
api_key := os.Getenv("MAILGUN_API_KEY")
sandbox_domain := os.Getenv("MAILGUN_DOMAIN")

mg := mailgun.NewMailgun(sandbox_domain, api_key)
sender := "testingsomething@example.com"
subject := "security warning"
body := "Your fallback api is exposed"
recipient := "<YOUR_EMAIL>"
message := mg.NewMessage(sender, subject, body, recipient)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
resp, id, err := mg.Send(ctx, message)
if err != nil {
log.Fatal(err)
}
fmt.Printf("ID: %s Resp: %s\\n", id, resp)
}

Now, let's configure systemd to run this application as a service.

systemd is an initialisation system and system manager that comes with most Linux distributions. It comes with its management tool systemctl

If you want to learn more about systemd or if you are new to it, you can find a tutorial link below in the resources. Let’s get back to creating our service.

First, we will create a service file:

$ sudo vim /lib/systemd/system/simple_consumer.service

Next, we will add the following configuration options in our simple_consumer.service:

Before we move on, let’s talk about some of these configuration steps:

  1. On line 3, ConditionPathExists is required to make sure that our application directory is present in our server before it starts running
  2. After on line 6, tells systemd to wait until postgresql service is started. This is required because I’m integrating this into my simple API server. This might not be required depending on your configuration.
  3. On line 12, we specify WorkingDirectory . This is important to retrieve our environment variables from our .env file.
  4. Lastly, ExecStart specifies the location of our executable file

Before we enable our system, we need to give relevant permissions to run our application:

$ sudo chmod +x /opt/appDir/simple_consumer

Now using systemctl we can first enable the service and start it. We can check the status of the application using the last command:

$ sudo systemctl enable simple_consumer      # Enable the service
$ sudo systemctl start simple_consumer # Start the service
$ sudo systemctl status simple_consumer # Check service status

Now lets trigger enqueue a couple of times and see our queue before and after the cron-job is run:

$ sudo rabbitmqctl list_queuesTimeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
FallbackAPIQueue 3

The number next to FallbackAPIQueue should go down to 0 and in our inbox, we should have an email for each message that is dequeued from our queue 🎊

Resources

--

--

Alican Sungur
CodeX
Writer for

I write about distributed systems, and other programming related things https://www.linkedin.com/in/sunguralican/