Utilizing RabbitMQ with F#

Read Time: 12 minutes

Today is a overview of how to use the RabbitMQ message broker with F#. The RabbitMQ tutorials have a lot of great getting started information. Its .NET section focuses on C#, so I wanted to provide an F# angle to the general use cases.

There are a several different use cases for leveraging RabbitMQ. My examples will track against most of the tutorials, but with my own twist on the examples. The producer will generate and publish a datastream of floats using RabbitMQ. Depending on the specific use cases, consumers will filter and processes messages out of RabbitMQ. I want all of these to be singular files, so I also leverage Async workflows to operate both producer and consumers in the same process. To get started, you will need to have .NET Core installed. Once this is complete the application can be setup.

1
2
3
dotnet new console -lang F# -n RabbitMQ
cd RabbitMQ
dotnet new package RabbitMQ.Client --version 5.1.2

Based on the types of interaction, I’ve broken the post into sections. For quick reference, the sections are:

Work Queues
Publish/Subscribe
Routing
Topics

There is a lot of repetition, but I want each example to stand on it’s own. In all of the below examples, the same libraries and namespaces are used. So this will be a singular header for all examples. The primary difference between the following examples is the exchange, queue, and routing key definitions and usages.

1
2
3
4
5
open System
open System.Text
open System.Threading
open RabbitMQ.Client
open RabbitMQ.Client.Events

Work Queues

For this example, the interaction is pretty direct. The producer takes it’s datastream and publishes it to the defined queue every 500 milliseconds. The consumer(s) will read from the same queue, processing data as available. The consumer is setup in a similar fashion to the producer; they both need a connexionand channel. It does has the extra need to setup a handler for receiving messages. Once it is all setup, the consumer must be started for processing. The final piece is running the producer and consumer workflows. Since this is a quick test, I’ll run everything for 5 seconds, then kill the workflow.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
let producer hostname queue routingKey (token: CancellationTokenSource) = async {
let factory = ConnectionFactory(HostName = hostname)
use connection = factory.CreateConnection()
use channel = connection.CreateModel()
let result = channel.QueueDeclare(queue = queue, durable = false, exclusive = false, autoDelete = false, arguments = null)

let rand = Random()

while not token.IsCancellationRequested do
let message = sprintf "%f" (rand.NextDouble())
let body = Encoding.UTF8.GetBytes(message)
printfn "publish : %s" message
channel.BasicPublish(exchange = "", routingKey = routingKey, basicProperties = null, body = body)
Thread.Sleep(500)
}

let consumer id hostname queue (token: CancellationTokenSource) = async {
let factory = ConnectionFactory(HostName = hostname)
use connection = factory.CreateConnection()
use channel = connection.CreateModel()
let result = channel.QueueDeclare(queue = queue, durable = false, exclusive = false, autoDelete = false, arguments = null)

let consumer = EventingBasicConsumer(channel)
consumer.Received.AddHandler(new EventHandler<BasicDeliverEventArgs>(fun sender (data:BasicDeliverEventArgs) ->
let body = data.Body
let message = Encoding.UTF8.GetString(body)
printfn "consumed [%s]: %s" id message))

let consumeResult = channel.BasicConsume(queue = queue, autoAck = true, consumer = consumer)

while not token.IsCancellationRequested do
Thread.Sleep(500)
}

[<EntryPoint>]
let main argv =
let host = "localhost"
let queue = "data_stream"
let routingKey = "data_stream"

async {
let token = new CancellationTokenSource()
token.CancelAfter 5000

seq {
(producer host queue routingKey token);
(consumer "1" host queue token);
(consumer "2" host queue token) }
|> Async.Parallel
|> Async.RunSynchronously |> ignore
} |> Async.RunSynchronously

0

The output shows the two consumers picking data off the data_stream queue.

1
2
3
4
5
6
7
8
9
10
11
publish     : 0.750373
consumed [1]: 0.750373
publish : 0.130004
consumed [2]: 0.130004
publish : 0.279026
consumed [1]: 0.279026
publish : 0.709584
consumed [2]: 0.709584
publish : 0.573295
consumed [1]: 0.573295
...

Publish/Subscribe

In contrast to the previous example, using a Fanout exchange type distributes the published message to all subscribed consumers. Of note, here I use exchange and routing key. The consumer also needs a queue binding, that wasn’t required in the previous example.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
let producer hostname exchange routingKey (token: CancellationTokenSource) = async {
let factory = ConnectionFactory(HostName = hostname)
use connection = factory.CreateConnection()
use channel = connection.CreateModel()
channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Fanout, durable = false, autoDelete = false, arguments = null)

let rand = Random()

while not token.IsCancellationRequested do
let message = sprintf "%f" (rand.NextDouble())
let body = Encoding.UTF8.GetBytes(message)
printfn "publish: %s" message
channel.BasicPublish(exchange = exchange, routingKey = routingKey, basicProperties = null, body = body)
Thread.Sleep(500)
}

let consumer id hostname exchange routingKey (token: CancellationTokenSource) = async {
let factory = ConnectionFactory(HostName = hostname)
use connection = factory.CreateConnection()
use channel = connection.CreateModel()
channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Fanout)

let queueName = channel.QueueDeclare().QueueName
channel.QueueBind(queue = queueName, exchange = exchange, routingKey = routingKey);

let consumer = EventingBasicConsumer(channel)
consumer.Received.AddHandler(new EventHandler<BasicDeliverEventArgs>(fun sender (data:BasicDeliverEventArgs) ->
let body = data.Body
let message = Encoding.UTF8.GetString(body)
printfn "consumed [%s]: %A" id message))

let consumeResult = channel.BasicConsume(queue = "", autoAck = true, consumer = consumer)

while not token.IsCancellationRequested do
Thread.Sleep(500)
}

[<EntryPoint>]
let main argv =
let host = "localhost"
let exchange = "pubsub_exchange"
let routingKey = "pubsub_route"

async {
let token = new CancellationTokenSource()
token.CancelAfter 5000

seq {
(producer host exchange routingKey token);
(consumer "1" host exchange routingKey token);
(consumer "2" host exchange routingKey token) }
|> Async.Parallel
|> Async.RunSynchronously |> ignore

printfn "workflow complete"
} |> Async.RunSynchronously

0

The below results show each consumer processing every message.

1
2
3
4
5
6
7
8
9
10
11
12
13
publish: 0.178846
consumed [1]: "0.178846"
consumed [2]: "0.178846"
publish: 0.734746
consumed [1]: "0.734746"
consumed [2]: "0.734746"
publish: 0.171448
consumed [2]: "0.171448"
consumed [1]: "0.171448"
publish: 0.890113
consumed [2]: "0.890113"
consumed [1]: "0.890113"
...

Routing

For this example, if the first digit is even or odd, the producer will route it to the appropriate ‘even’ or ‘odd’ route. Two consumers are setup, one to listen for even messages, and the other to listen to odd messages.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
let producer hostname exchange (routingKeys:string list) (token: CancellationTokenSource) = async {
let factory = ConnectionFactory(HostName = hostname)
use connection = factory.CreateConnection()
use channel = connection.CreateModel()
channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Direct, durable = false, autoDelete = false, arguments = null)

let rand = Random()

while not token.IsCancellationRequested do
let data = rand.NextDouble()
let message = sprintf "%f" data
let body = Encoding.UTF8.GetBytes(message)
printfn "publish : %s" message
let routingKey = if int ((data * 10.)) % 2 = 0
then routingKeys.[0]
else routingKeys.[1]
channel.BasicPublish(exchange = exchange, routingKey = routingKey, basicProperties = null, body = body)
Thread.Sleep(500)
}

let consumer id hostname exchange routingKey (token: CancellationTokenSource) = async {
let factory = ConnectionFactory(HostName = hostname)
use connection = factory.CreateConnection()
use channel = connection.CreateModel()
channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Direct)

let queueName = channel.QueueDeclare().QueueName
channel.QueueBind(queue = queueName, exchange = exchange, routingKey = routingKey);

let consumer = EventingBasicConsumer(channel)
consumer.Received.AddHandler(new EventHandler<BasicDeliverEventArgs>(fun sender (data:BasicDeliverEventArgs) ->
let body = data.Body
let message = Encoding.UTF8.GetString(body)
printfn "consumed [%s]: %s" id message))

let consumeResult = channel.BasicConsume(queue = "", autoAck = true, consumer = consumer)

while not token.IsCancellationRequested do
Thread.Sleep(500)
}

[<EntryPoint>]
let main argv =
let host = "localhost"
let exchange = "routing_exchange"
let routingKey0 = "even"
let routingKey1 = "odd"

async {
let token = new CancellationTokenSource()
token.CancelAfter 5000

seq {
(producer host exchange [routingKey0; routingKey1] token);
(consumer "0" host exchange routingKey0 token);
(consumer "1" host exchange routingKey1 token) }
|> Async.Parallel
|> Async.RunSynchronously |> ignore
} |> Async.RunSynchronously

0

The below results show consumer 0 filtering on even messages and consumer 1 filtering on odd messages.

1
2
3
4
5
6
7
8
9
publish: 0.910740
consumed [1]: 0.910740
publish : 0.650291
consumed [0]: 0.650291
publish : 0.128357
consumed [1]: 0.128357
publish : 0.750614
consumed [1]: 0.750614
...

Topics

The Topic exchange type provides a mechanism for consumers to apply advanced filtering to the messages they process. In this example, the same even/odd producer is used. The consumers use routing keys that support wildcards. all.* will process messages that start with ‘all’, while *.odd will process messages ending in ‘odd’. This functionality allows you to setup topics in such a fashion to provided more advanced consumer processing.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
let producer hostname exchange (routingKeys:string list) (token: CancellationTokenSource) = async {
let factory = ConnectionFactory(HostName = hostname)
use connection = factory.CreateConnection()
use channel = connection.CreateModel()
channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Topic, durable = false, autoDelete = false, arguments = null)

let rand = Random()

while not token.IsCancellationRequested do
let data = rand.NextDouble()
let message = sprintf "%f" data
let body = Encoding.UTF8.GetBytes(message)
printfn "publish: %s" message
let routingKey = if int ((data * 10.)) % 2 = 0
then routingKeys.[0]
else routingKeys.[1]
channel.BasicPublish(exchange = exchange, routingKey = routingKey, basicProperties = null, body = body)
Thread.Sleep(500)
}


let consumer id hostname exchange routingKey (token: CancellationTokenSource) = async {
let factory = ConnectionFactory(HostName = hostname)
use connection = factory.CreateConnection()
use channel = connection.CreateModel()
channel.ExchangeDeclare(exchange = exchange, ``type`` = ExchangeType.Topic) //, durable = false, autoDelete = false, arguments = null)

let queueName = channel.QueueDeclare().QueueName
channel.QueueBind(queue = queueName, exchange = exchange, routingKey = routingKey);

let consumer = EventingBasicConsumer(channel)
consumer.Received.AddHandler(new EventHandler<BasicDeliverEventArgs>(fun sender (data:BasicDeliverEventArgs) ->
let body = data.Body
let message = Encoding.UTF8.GetString(body)
printfn "consumed [%s]: %A" id message))

let consumeResult = channel.BasicConsume(queue = "", autoAck = true, consumer = consumer)

while not token.IsCancellationRequested do
Thread.Sleep(500)
}


[<EntryPoint>]
let main argv =
let host = "localhost"
let exchange = "topic_exchange"
let routingKey0 = "all.even"
let routingKey1 = "all.odd"

async {
let token = new CancellationTokenSource()
token.CancelAfter 5000

seq {
(producer host exchange [routingKey0; routingKey1] token);
(consumer "0" host exchange routingKey0 token);
(consumer "1" host exchange routingKey1 token);
(consumer "2" host exchange "all.*" token)
(consumer "3" host exchange "*.even" token)
}
|> Async.Parallel
|> Async.RunSynchronously |> ignore
} |> Async.RunSynchronously

0

The below results show consumers pulling their appropriately filtered messages. Even messages are processed by consumers 0, 2, and 3. Odd messages are processed by consumers 1 and 2.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
publish     : 0.789320
consumed [2]: 0.789320
consumed [1]: 0.789320
publish : 0.163861
consumed [1]: 0.163861
consumed [2]: 0.163861
publish : 0.256774
consumed [0]: 0.256774
consumed [3]: 0.256774
consumed [2]: 0.256774
publish : 0.273446
consumed [3]: 0.273446
consumed [2]: 0.273446
consumed [0]: 0.273446
...

This has been a quick overview in how RabbitMQ can be used with F#. It really only touches the surface of what can be done. Until next time.