Lucas' Blog

cat /dev/thoughts

Building a Chat From Scratch With Go and MQTT

Recently I started learning Go and about Messaging Protocols, and as I think that is easier to learn something while also putting it into practice, I made a very basic chat that I want to share with you.

The Go documentation is, at the same time, complete and scarce. Every package has its own GoDocs, but sometimes with very little explanation, then I needed to dig further to understand better the MQTT Go Library. So I thought I could make a blog post in order to share what I learned and expand the community’s material.

For this tutorial, I used the Go language, the Go package for MQTT protocol, RabbitMQ and its MQTT plugin; and the code was inspired in this example.

What about all those MQs?

Beforehand I need to explain the difference between the broker and the protocols. There are three main messaging protocols: AMQP, MQTT, and STOMP. Each of these has their pros and cons, and situations in with one of them is a better choice.

Then there is the also the brokers, like RabbitMQ, Mosquitto, NSQ and ZeroMQ. The brokers are programs that may implement one or more messaging protocols and are used in order to pass the message between the publisher and the subscriber (we will understand better these names below).

In this tutorial, I used MQTT v3.1.1 with RabbitMQ. I also played with AMPQ but found MQTT much easier and interesting, especially its use cases (IoT).

The setup

First I had to install Go, then I installed RabbitMQ with

1
sudo apt-get install rabbitmq-server

and install these two Go libraries

1
2
go get github.com/akamensky/argparse
go get github.com/eclipse/paho.mqtt.golang

Then I just enabled the MQTT plugin on the RabbitMQ and adjusted two users (user1 and user2, in with user and password are the same)

1
2
3
4
5
sudo rabbitmq-plugins enable rabbitmq_mqtt
rabbitmqctl add_user user1 user1
rabbitmqctl add_user user2 user2
rabbitmqctl set_permissions -p / user1 ".*" ".*" ".*"
rabbitmqctl set_permissions -p / user2 ".*" ".*" ".*"

The code

The main loop first gets user and password from the input arguments and assemble the full URL service:

1
2
3
4
5
6
7
8
9
10
11
func main() {
    user, passwd := parseUserArgs()
    fullUrl := fmt.Sprintf("mqtt://%s:%s@localhost:1883/test", user, passwd)
    uri, err := url.Parse(fullUrl)
    failOnError(err, "Failed to parse given URL")

    forever := make(chan bool)
    go listen(uri)
    go poolMessage(uri, user)
    <-forever
}

Then a channel is created in order to keep the program running and two goroutines are created: one to listen to the messages from the broker (the subscriber), and other to get the message from the output and send it to the broker (the producer).

The consumer is very simple, it creates a client, connected to the given URI, and, every time it receives a message, it calls the callback function and prints it on the screen:

1
2
3
4
5
6
7
8
func showMessage(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("* %s\n", string(msg.Payload()))
}

func listen(uri *url.URL) {
    client := connect(uri)
    client.Subscribe(parseTopic(uri), QOS_AT_MOST_ONCE, showMessage)
}

The first parameter of Subscribe is the topic, that is like the channel we are listening to. Topics are very interesting, and can even have a hierarchy that turns easier to broadcast messages and share specific rules. More about topics can be read here. The topic we are using here is the path of our URI, test.

The second argument is about the Quality of Service, or the level of confidence we can trust our message will be delivered. There are three levels:

  • At most once (0): The level we used in our chat, also know as Fire-and-Forget. It sends a message and doesn’t wait for any kind of confirmation. Thus, messages will be delivered once or none;
  • At least once (1): The message will be delivered and, after a while, if no response is returned, it will be sent again. Thus, messages will be delivered one or more times;
  • Exactly once (2): This is the slowest QoS because it has a four part handshake, that assures the message will be delivered once, no more or less.

The third argument is the callback function, in this case, the showMessage, which is called with the client and the message to be printed.

The producer, at its turn, waits until a message is typed and then sends it to the broker, it is the poolMessage:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func sendMessage(msg string, uri *url.URL) {
    client := connect(uri)
    RETAIN_MESSAGE := false
    client.Publish(parseTopic(uri), QOS_AT_MOST_ONCE, RETAIN_MESSAGE, msg)
}

func poolMessage(uri *url.URL, user string) {
    for {
        r := bufio.NewReader(os.Stdin)
        msg, _ := r.ReadString('\n')
        msg = fmt.Sprintf("%s: %s", user, strings.TrimSpace(msg))
        sendMessage(msg, uri)
    }
}

I used bufio, and not fmt.Scanf because I found a lot easier to read spaces from the terminal with the former. After reading the input, the message is passed to a function to be sent.

The Publish’s parameters are very similar to Subscribe, the only difference is the RETAIN_MESSAGE. When this argument is flagged true, the broker stores the last message and every time a new user subscribes it receives that retained message.

I experimented using the parameter as true to see how it worked and had some trouble trying to remove the retained message after. As I did not want to receive that message every time I connected I discovered I had to overwrite it with a null value.

I tried to publish a message with a null value, without success, and searched for a RabbitMQ client interface, and also didn’t find any. The solution I had was using the Mosquitto client

1
mosquitto_pub -n -r -t "test"

In which I publish (mosquitto_pub) a null message (-n) as retained (-r) to the topic “test” (-t "test").

But I’m still uncomfortable not knowing how to do that with the Go’s MQTT.

The last piece of this code is the connect function, which is quite simple, it gets a few options mqtt.ClientOptions, and connects a new client. The loop with the token.WaitTimeout waits until the connection is established, checking it each microsecond:

1
2
3
4
5
6
7
8
9
func connect(uri *url.URL) mqtt.Client {
    opts := createClientOptions(uri)
    client := mqtt.NewClient(opts)
    token := client.Connect()
    for !token.WaitTimeout(time.Microsecond) {
    }
    failOnError(token.Error(), "Failed while connecting")
    return client
}

The options are built using the data passed in URI, telling where the broker is, who is the user and its password. It is still possible to SetClientID in order to keep a session for a unique client (I did not use it here for simplicity’s sake):

1
2
3
4
5
6
7
8
9
func createClientOptions(uri *url.URL) *mqtt.ClientOptions {
    password, _ := uri.User.Password()
    name := uri.User.Username()
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s", uri.Host))
    opts.SetUsername(name)
    opts.SetPassword(password)
    return opts
}

There were some boilerplates I just skipped. The complete code can be seen here.

Conclusion

This is a quite simple tutorial and I know it did not cover all the subject. Feel free to share any ideas or questions.

Comments