Building an MQTT server in F# - Part 1

Read Time: 7 minutes

F# isn’t often mentioned when discussion turns to writing a server. But it is a more than capable language to accomplish such tasks. Honestly, what it brings to the table is an asset in comparison to other languages and should be considered more often. Beyond the language are supporting capabilities, like Kestrel, that become a huge resource when venturing down this path. In this series of posts I’ll use MQTT as an implementation case study.

In the past, writing a server would require a lot of socket and connection management code. But .NET Core and Kestrel have more to offer. The most common usage associated with Kestrel is obviously web servers, but it provides more flexibility than that. What this means is I can build an MQTT server leveraging the underlying base server networking that Kestrel provides. This allows me to focus more attention on the server functionaltiy, and less on the idiosyncrasies of networking. As a bonus, when Microsoft improves the networking libraries, I get the benefits for free. The question then is how to best examine this interaction. Building an MQTT server is a good candidate, since it is complicated enough to require more advanced modeling, but simple enough that it’s protocol is approachable. For reference, the MQTT Spec is here for your reading pleasure.

This is a longer project, so I’ll break it into a several posts. The first post is getting the minimal server connectivity setup in F#. To set expectations, I won’t really touch MQTT here. It’s more about setting up a foundation for future posts. Additional posts will dig into implementation details and how to leverage F#-specific power when writing as server.

Now. to get started into some server creation. For reference, this is implemented using .NET Core 3.1. As a start, I’ll make my modifications based on a web template. There are some resemblances to the boiler plate typical web app for getting the server component up and running. This makes sense since I’m building a server. Taking a look at the code below (Program.fs) at a high level, the server listens on the default MQTT port (1883) and passes the traffice to a handler. If you’ve written a .NET Core web app before, there is much that is familiar. The networking heavy lifting is instantiated when I create a host. After that, the hard stuff is done in the MQTT handler.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
namespace MqttServer

open Microsoft.AspNetCore
open Microsoft.AspNetCore.Builder
open Microsoft.AspNetCore.Connections
open Microsoft.AspNetCore.Hosting
open Microsoft.Extensions.Configuration
open Microsoft.Extensions.DependencyInjection

module Program =
type Startup private () =
new(configuration: IConfiguration) as this =
Startup()
then this.Configuration <- configuration

member this.ConfigureServices(services: IServiceCollection) = ()

member this.Configure(app: IApplicationBuilder, env: IWebHostEnvironment) = ()

member val Configuration: IConfiguration = null with get, set

let CreateHostBuilder args =
WebHost.CreateDefaultBuilder(args)
.UseKestrel(fun options ->
options.ListenLocalhost
(1883,
(fun builder ->
builder.UseConnectionHandler<MqttConnectionHandler>()
|> ignore))).UseStartup<Startup>()

[<EntryPoint>]
let main args =
CreateHostBuilder(args).Build().Run()
0

Next, I need to put together the handler for the “business logic” of the server. Before getting into the real MQTT implementation, I feel it’s important to show what a bare minimum implementation looks like. For that I’m going to implement echo server functionality (mine will just run on port 1883 instead of 7). What is important to understand is what gets passed to the connection handler and how the interaction with the client works. Once that’s in place I’ll look at expanding to actual MQTT logic.

First, incoming data is retrieved using connection.Transport.Input.ReadAsync(). The data is provided through an enumerator of memory segments (of the type ReadOnlyMemory<byte>). At this point it’s my responsibility to figure out what I want to do with these segments. In future posts I need to get more advanced to properly handle MQTT packets. But here I can just take what I get and echo it back to the client. For that I make a connection.Transport.Output.WriteAsync() call. These are the main pieces you need to be able to interact with the client. Beyond that there is the typical housekeeping things, like ending if the client closes the connection, etc. For debugging purposes I’ll use a byteToString function to let me peek into the individual segments.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
namespace MqttServer

open System.Threading.Tasks
open Microsoft.AspNetCore.Connections
open Utils
open MqttState

/// MQTT Handler
type MqttConnectionHandler() =
inherit ConnectionHandler()

/// Show the pipeline's segements' byte
let bytesToString (segment: ReadOnlyMemory<byte>) :string =
segment.ToArray()
|> Array.map (fun (x: byte) -> x.ToString("X2"))
|> String.concat " "
|> sprintf "Length: %3d Data: %s" segment.Length

/// Echo server
override __.OnConnectedAsync(connection: ConnectionContext) =
async {
let mutable clientConnected = true
while clientConnected do
let! result =
connection.Transport.Input.ReadAsync().AsTask()
|> Async.AwaitTask

if (result.IsCanceled || result.IsCompleted) then
clientConnected <- false
else
let buffer = result.Buffer

let mutable iter = buffer.GetEnumerator()
let mutable continueBuffer = true
while continueBuffer do
let segment = iter.Current
printfn "Segment: %s" (bytesToString segment)
let! result =
connection.Transport.Output.WriteAsync(segment).AsTask()
|> Async.AwaitTask
continueBuffer <- iter.MoveNext()

connection.Transport.Input.AdvanceTo(buffer.End)
} |> Async.StartAsTask :> Task

With these basic pieces in place, it’s time to take it for a spin. In one console I fire up my fresh new server, in the other I’ll send over some text using netcat. Below are the results. It works as expected. First step of the mession accomplished.

1
2
3
4
5
6
7
8
9
10
# Server Console:
$ dotnet run
Hosting environment: Development
Content root path: /home/codesuji/projects/mqtt-server/src
Now listening on: http://localhost:1883
Application started. Press Ctrl+C to shut down.
Segment: Length: 0 Data:
Segment: Length: 24 Data: 48 65 72 65 20 69 73 20 61 20 74 65 73 74 20 6D 65 73 73 61 67 65 2E 0A
Segment: Length: 0 Data:
Segment: Length: 11 Data: 49 74 20 77 6F 72 6B 65 64 21 0A
1
2
3
4
5
6
# Client Console:
$ nc 127.0.0.1 1883
Here is a test message.
Here is a test message.
It worked!
It worked!

This is a good place to stop. I have my foundation built using Kestrel for very basic server connectivity. It didn’t take that mucht to put together either. Next post I’ll start digging into the MQTT-specific implementation. That is when it starts to get really fun.