Building an MQTT server in F# - Part 3

Read Time: 21 minutes

Today I continue the MQTT server series by expanding capabilities to include Publish and Subscribe packets. This will build upon the previous F#-based server foundation.

Looking back at the series, I previously built basic networking and Connect functionality into an MQTT server. Just being able to perform a connection handshake is nice, but not real useful. This means it is time to take the next step to support core MQTT functionality, namely the ability to subscribe to topics and publish messages. As always, it is time to turn to the MQTT Spec. Once connected, a client is permitted to perform several actions. The two I’m interested in today are subscribe to topics, and publish messages. An MQTT publish message consists of two parts, a topic and a message. The topic is how different types of messages are grouped and filtered through the system. A system may have topics like ‘/lights/status’ or ‘/weather-station/1/temperature’. The messages can be in any format, but processed through MQTT as bytes. It is up to the MQTT user to define the format of topics and messages. One or more clients subscribe to particular topics. Additionally, clients publish messages, with an attached topic. The MQTT server then forwards messages onto clients based on their particular subscriptions. It is a basic pub/sub model, the important note here is that subscriptions are handled at a topic level.

There are a couple things I need to handle: parsing new MQTT packets, then processing those packets. But first I need to define the types. F# again supports this process well, as types drive everything. Since my immediate focus is on publishing packets. I need to be able to handle a Publish packet. I also need to be able build a PubAck packet to send back to the publishing client. The server also needs to be able to send a Publish packet to other clients. These types are modeled from the spec, so it is pretty direct to put these together. Since the server will be sending messages out, I need to support serialization of these types as well.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/// Publish packet - Fixed header
type FixedPublishHeader =
{ PacketType: PacketType
Dup: bool
Qos: byte
Retain: bool
RemainingLength: int
Size: int
} with
member __.Serialize() =
let flags =
(if __.Dup then byte 0b100 else byte 0b0000)
||| (__.Qos <<< 1)
||| (if __.Retain then byte 0b0001 else byte 0b0001)

let byte1 = (byte __.PacketType <<< 4) ||| flags
[| [| byte1 |]
(encodePacketRemainingLength __.RemainingLength) |]
|> Array.concat

/// Publish packet - Variable header
type VariablePublishHeader =
{ Topic: string
PacketIdentifier: int
Size: int
} with
member __.Serialize() =
[| stringToSerializedBytes __.Topic
[| byte (__.PacketIdentifier >>> 8)
byte (__.PacketIdentifier &&& 0xFF) |] |]
|> Array.concat

/// Publish packet - Payload
type PublishPayload =
{
Message: byte []
} with
member __.Serialize() = __.Message

/// Publish packet
type PublishPacket =
{ FixedHeader: FixedPublishHeader
VariableHeader: VariablePublishHeader
Payload: PublishPayload
}
interface IResponsePacket with
member __.Serialize() =
[| __.FixedHeader.Serialize()
__.VariableHeader.Serialize()
__.Payload.Serialize() |]
|> Array.concat

/// PubAck packet - Fixed header
type PubAckFixedHeader =
{ PacketType: PacketType
RemainingLength: int }
member __.Serialize() =
[| [| byte __.PacketType <<< 4 |]
(encodePacketRemainingLength __.RemainingLength) |]
|> Array.concat

/// PubAck packet - Variable header
type PubAckVariableHeader =
{ PacketIdentifier: int }
member __.Serialize() =
[| byte (__.PacketIdentifier >>> 8)
byte (__.PacketIdentifier &&& 0xFF) |]

/// PubAck packet
type PubAckResponsePacket =
{ FixedHeader: PubAckFixedHeader
VariableHeader: PubAckVariableHeader }
interface IResponsePacket with
member __.Serialize() =
[| __.FixedHeader.Serialize()
__.VariableHeader.Serialize() |]
|> Array.concat

With the types in place, it is time to dig into the parsing. Since I’m adding a new type I need to update the main control function. Beyond that, the fixed header is simple to parse. In fact, most of the fixed header parsing is very similar. The only real difference between them is the required flags per message type. Here, I need to extract Dup, QoS, and Retain flags; simply done with some bit-shifts and ANDing. The message’s topic is stored in the variable header, so I need to do the standard string extraction technique that I used before. Messages also have an associated Id, also in the variable header. The payload, or message, is just an array of bytes. With MQTT, it is up to the clients to determine the payload encoding method. This is nice, I’ve used this flexibility where messages are sometimes encoded UTF-8 strings, other times they are raw byte messages off of a device, or even images.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/// Handle buffer parsing
let parseBufferSegment (buffer: ReadOnlyMemory<byte>) :RequestPacket option =
...
match packetType with
| PacketType.Connect -> parseConnectPacket buffer'
| PacketType.Publish -> parsePublishPacket buffer'
| _ -> printfn "ERROR: Unsupported packet type: %A" packetType
None
...

/// Parse buffer for a Publish packet
let parsePublishPacket (buffer: ReadOnlySpan<byte>) :RequestPacket option =
let position = 0
let (fixedHeader, position) = parsePublishFixedHeader buffer
let (variableHeader, position) = parsePublishVariableHeader buffer fixedHeader position
let (payload, position) = parsePublishPayload buffer fixedHeader variableHeader position

Some(RequestPacket.Publish
{ RequestPublishPacket.FixedHeader = fixedHeader
VariableHeader = variableHeader
Payload = payload })

/// Extract fixed header (Publish) from buffer
let parsePublishFixedHeader (buffer: ReadOnlySpan<byte>) :(FixedPublishHeader * int) =
let position = 0
let packetType = getPacketType buffer.[position]
let (remainingLength, position') = getPacketRemainingLength buffer (position + 1)

let dup = buffer.[position] &&& 8uy <> 0uy
let qos = buffer.[position] &&& 6uy >>> 1
let retain = buffer.[position] &&& 1uy <> 0uy

({ FixedPublishHeader.PacketType = packetType
Dup = dup
Qos = qos
Retain = retain
RemainingLength = remainingLength
Size = position' },
position')

/// Extract variable header (Publish) from buffer
let parsePublishVariableHeader (buffer: ReadOnlySpan<byte>)
(fixedHeader: FixedPublishHeader)
(position: int)
: (VariablePublishHeader * int) =
let startPosition = position

// Topic
let (topic, pos) = extractString buffer position
let position = pos

// Packet identifier
let packetIdentifier = twoBytesToInt (buffer.Slice(position, 2))

let position = position + 2

({ VariablePublishHeader.Topic = topic
PacketIdentifier = packetIdentifier
Size = position - startPosition },
position)

/// Extract payload (Publish) from buffer
let parsePublishPayload (buffer: ReadOnlySpan<byte>)
(fixedHeader: FixedPublishHeader)
(variableHeader: VariablePublishHeader)
(position: int)
: (PublishPayload * int) =
let dataLength = fixedHeader.RemainingLength - variableHeader.Size

let data = buffer.Slice(position, dataLength).ToArray()

let position = position + dataLength

({ PublishPayload.Message = data }, position)

At this point the packet is parsed, and needs to be processed. Here a couple things need to happen. The server needs to send a PubAck packet back to the publishing client. It also needs to interrogate subscriptions and forward the message onto the appropriate clients. Additionally, the server needs to retain messages for future publishing, if the message is flagged as such. The first step in all of this is construction of the appropriate packets. Once done, they are passed into the state processor, where the filtering and sending are done.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/// Process an mqtt request packet
let processPacket (state: StateManager) (context: ConnectionContext) (packet: RequestPacket option) =
match packet with
| Some(RequestPacket.Connect (p)) -> processConnect state context p
| Some(RequestPacket.Publish (p)) -> processPublish state context p
| Some(RequestPacket.Disconnect (p)) -> processDisconnect state context
| None ->
printfn "ERROR: Invalid packet data. Disconnecting..."
processDisconnect state context

/// Process a Publish packet
let processPublish (state: StateManager) (context: ConnectionContext) (packet: PublishPacket) :ProcessResult option =
// Build pubaack packet
let ackVariableHeader = { PubAckVariableHeader.PacketIdentifier = packet.VariableHeader.PacketIdentifier }
let variableHeader' = ackVariableHeader.Serialize()

let ackFixedHeader =
{ PubAckFixedHeader.PacketType = PacketType.SubAck
RemainingLength = variableHeader'.Length }

let pubAckPacket =
{ PubAckResponsePacket.FixedHeader = ackFixedHeader
VariableHeader = ackVariableHeader }
let ackMessage = serializeResponsePacket pubAckPacket

// Build a publish packet for subscribed clients
let pubVariableHeader =
{ VariablePublishHeader.Topic = packet.VariableHeader.Topic
PacketIdentifier = packet.VariableHeader.PacketIdentifier
Size = 0}
let pubVariableHeader' = pubVariableHeader.Serialize()

let pubPayload = { PublishPayload.Message = packet.Payload.Message }
let pubPayload' = pubPayload.Serialize()

let remainingLength = (pubVariableHeader'.Length + pubPayload'.Length)

let pubFixedHeader =
{ FixedPublishHeader.PacketType = PacketType.Publish
Dup = packet.FixedHeader.Dup
Qos = packet.FixedHeader.Qos
Retain = packet.FixedHeader.Retain
RemainingLength = remainingLength
Size = 0 }

let pubMessage =
{ PublishPacket.FixedHeader = pubFixedHeader
VariableHeader = pubVariableHeader
Payload = pubPayload }

let pubMessage = serializeResponsePacket pubMessage

state.Post(StateMessage.Publish(context, pubMessage, packet.VariableHeader.Topic, ackMessage, packet.FixedHeader.Retain))

None

Just like previous posts, the state manager controls message sending and tracking state. First it sends a PubAck back to the client. After that it needs to do subscription filtering and sends. The filtering happens based on text matching as defined in the spec. It supports exact string matches as well as wildcarding used the # and * characters. Once the destination clients are determined it is a matter of sending. This can be done in parallel, which can be easily accomplished using Async.Parallel. I realize this isn’t truly unique of F#, but I love how easy the semantics are to parallelize types of workloads. This is definitely a win in the F# column. To ensure proper QoS, it needs to track messages sent to clients. This way the server can resend lost messages if appropriate, etc.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/// Interface to state manager
let stateManager () =
...
match msg with
| Connect (connection, message) -> stateConnect s connection message
| Publish (connection, message, topic, ack, retain) -> statePublish s connection message topic ack retain
| Disconnect (connection) -> stateDisconnect s connection
...

/// Handle message publishing
let statePublish (state: MqttState)
(connection: ConnectionContext)
(message: byte [])
(topic: string)
(ackMessage: byte [])
(retain: bool) =
async {
// Write puback to client
let! success = writeMessage connection ackMessage

// Process filters and send to other clients
let matchingFilters =
state.ConnectedClients
|> Map.toArray
|> Array.collect (fun (k, v) -> v.Subscriptions |> Array.map (fun x -> (k, x)))
|> Array.filter (fun (k, v) -> isSubscriptionMatch topic v.Topic)

let results =
matchingFilters
|> Array.map (fun (k, v) -> (k, v, state.ConnectedClients.TryFind(k)))
|> Array.filter (fun (_, _, x) -> x.IsSome)
|> Array.map (fun (k, v, c) -> writeMessage c.Value.Context message)
|> Async.Parallel
|> Async.RunSynchronously

let clients =
matchingFilters
|> Array.map (fun (k, v) -> { ActiveMessageClient.ClientId = k; Qos = v.Qos })
|> Array.toList

let activeMessage =
{ ActiveMessage.Message = message
Retained = retain
Clients = clients }

let state' = { state with ActiveMessages = activeMessage :: state.ActiveMessages }
return state'
}

That wasn’t so bad. Now the server can handle Publish messages. This means I need to handle the other side of the equation, subscriptions. Publishing doesn’t mean much if no one is listening. For this interaction, the server needs to be able to handle a Subscribe message as well as return a SubAck message to the client. Like before, I start with the types. Again, I need to handle serialization so I can send these messages to clients.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/// Subscribe packet - Fixed header
type FixedSubscribeHeader =
{ PacketType: PacketType
RemainingLength: int
Size: int }

/// Subscribe packet - Variable header
type VariableSubscribeHeader = { PacketIdentifier: int; Size: int }

type TopicFilter = { Topic: string; Qos: byte }

type SubscribePayload = { Filters: TopicFilter [] }

/// Subscribe packet
type SubscribePacket =
{ FixedHeader: FixedSubscribeHeader
VariableHeader: VariableSubscribeHeader
Payload: SubscribePayload }

// SubAck packet - Fixed header
type SubAckFixedHeader =
{ PacketType: PacketType
RemainingLength: int }
member __.Serialize() =
[| [| byte __.PacketType <<< 4 |]
(encodePacketRemainingLength __.RemainingLength) |]
|> Array.concat

// SubAck packet - Variable header
type SubAckVariableHeader =
{ PacketIdentifier: int }
member __.Serialize() =
[| byte (__.PacketIdentifier >>> 8)
byte (__.PacketIdentifier &&& 0xFF) |]

// SubAck packet - Payload
type SubAckPayload =
{ Qoses: byte [] }
member __.Serialize() = __.Qoses

// SubAck packet
type SubAckPacket =
{ FixedHeader: SubAckFixedHeader
VariableHeader: SubAckVariableHeader
Payload: SubAckPayload }
interface IResponsePacket with
member __.Serialize() =
[| __.FixedHeader.Serialize()
__.VariableHeader.Serialize()
__.Payload.Serialize() |]
|> Array.concat

This almost feels repetitive, but now parse the incoming packets. Like the publish, there is a packet Id in the variable header for tracking. A subscription message can contain multiple subscriptions. As a result, the message is a list of topics along with their desired QoS.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/// Extract a packet from the buffer
let parseBufferSegment (buffer: ReadOnlyMemory<byte>) :RequestPacket option =
...
match packetType with
| PacketType.Connect -> parseConnectPacket buffer'
| PacketType.Publish -> parsePublishPacket buffer'
| PacketType.Subscribe -> parseSubscribePacket buffer'
| _ -> printfn "ERROR: Unsupported packet type: %A" packetType
None
...

/// Parse buffer for a Subscribe packet
let parseSubscribePacket (buffer: ReadOnlySpan<byte>) :RequestPacket option =
let position = 0
let (fixedHeader, position) = parseSubscribeFixedHeader buffer
let (variableHeader, position) = parseSubscribeVariableHeader buffer fixedHeader position
let (payload, position) = parseSubscribePayload buffer fixedHeader variableHeader position

Some(RequestPacket.Subscribe
{ SubscribePacket.FixedHeader = fixedHeader
VariableHeader = variableHeader
Payload = payload })

/// Extract fixed header (Subscribe) from buffer
let parseSubscribeFixedHeader (buffer: ReadOnlySpan<byte>) :(FixedSubscribeHeader * int) =
let position = 0
let packetType = getPacketType buffer.[position]
let (remainingLength, position') = getPacketRemainingLength buffer (position + 1)

({ FixedSubscribeHeader.PacketType = packetType
RemainingLength = remainingLength
Size = position' },
position')

/// Extract variable header (Subscribe) from buffer
let parseSubscribeVariableHeader (buffer: ReadOnlySpan<byte>)
(fixedHeader: FixedSubscribeHeader)
(position: int)
: (VariableSubscribeHeader * int) =
let startPosition = position

// packet identifier
let packetIdentifier = twoBytesToInt (buffer.Slice(position, 2))
let position = position + 2

({ VariableSubscribeHeader.PacketIdentifier = packetIdentifier
Size = position - startPosition },
position)

/// Extract payload (Subscribe) from buffer
let parseSubscribePayload (buffer: ReadOnlySpan<byte>)
(fixedHeader: FixedSubscribeHeader)
(variableHeader: VariableSubscribeHeader)
(start: int)
: (SubscribePayload * int) =
let mutable position = start
let mutable filters = []

let payloadSize = fixedHeader.RemainingLength - variableHeader.Size
while position - start < payloadSize do
let (data, pos) = extractString buffer position
let qos = buffer.[pos]
position <- pos + 1
filters <- { TopicFilter.Topic = data; Qos = qos } :: filters

({ SubscribePayload.Filters = filters |> List.toArray }, position)

Once parsed, we’re back to processing. The match expression is growing as I add functionality. In this case the server needs to send a SubAck back to the client. Then it can pass the subscriptions onto the stateManager for tracking. This is a simple task of just updating the specified client’s list of subscriptions. If you remember from above this is how the Publish determines who sees the messages. When broken down into it’s components, there isn’t much interesting here to see, but again, F# makes this process clean.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
let processPacket (state: StateManager) (context: ConnectionContext) (packet: RequestPacket option) =
match packet with
| Some(RequestPacket.Connect (p)) -> processConnect state context p
| Some(RequestPacket.Publish (p)) -> processPublish state context p
| Some(RequestPacket.PubComp (p)) -> processPubComp state context p
| Some(RequestPacket.Subscribe (p)) -> processSubscribe state context p
| Some(RequestPacket.Disconnect (p)) -> processDisconnect state context
| None ->
printfn "ERROR: Invalid packet data. Disconnecting..."
processDisconnect state context

/// Process a Subscribe packet
let processSubscribe (state: StateManager)
(context: ConnectionContext)
(packet: SubscribePacket)
: ProcessResult option =
let variableHeader = { SubAckVariableHeader.PacketIdentifier = packet.VariableHeader.PacketIdentifier }
let variableHeader' = variableHeader.Serialize()

let ackPayload =
{ SubAckPayload.Qoses =
packet.Payload.Filters
|> Array.map (fun x -> x.Qos) }
let payload' = ackPayload.Serialize()

let fixedHeader =
{ SubAckFixedHeader.PacketType = PacketType.SubAck
RemainingLength = (variableHeader'.Length + payload'.Length) }

let subAckPacket =
{ SubAckPacket.FixedHeader = fixedHeader
VariableHeader = variableHeader
Payload = ackPayload }

let message = serializeResponsePacket subAckPacket
state.Post(StateMessage.Subscribe(context, message, packet.Payload.Filters))

None

/// Interface to state manager
let stateManager () =
...
match msg with
| Connect (connection, message) -> stateConnect s connection message
| Publish (connection, message, topic, ack, retain) -> statePublish s connection message topic ack retain
| Subscribe (connection, message, subscriptions) -> stateSubscribe s connection message subscriptions
| Disconnect (connection) -> stateDisconnect s connection
...

/// Handle client subscription to topic(s)
let stateSubscribe (state: MqttState) (connection: ConnectionContext) (message: byte []) (subscriptions: TopicFilter []) =
async {
let! success = writeMessage connection message

// Add subscription filter
let client = Map.tryFind connection.ConnectionId state.ConnectedClients

let state' =
match client with
| Some (c) ->
let subscriptions' = Array.concat [| subscriptions; c.Subscriptions |]
let client' = { c with Subscriptions = subscriptions' }

let clients' =
state.ConnectedClients
|> Map.remove connection.ConnectionId
|> Map.add connection.ConnectionId client'

{ state with ConnectedClients = clients' }
| None -> state

return state'
}

Now that all the parts in place, it’s time to see it in action. I fire up the new server and run my trusty client script. The goal here is to connect, subscribe to a topic, publish to that topic, and see a result on the client side. Below you can see that is exactly what happens. On the server side you can track the messages as they arrive. The state debugging shows connected clients as well as their subscriptions. I also note what messages the server is sending back to the client. Once the client has sent it’s message it disconnects, thus causing the server to remove it from it’s state tracking. On the client side we get confirmation of this process, after publishing a message I can see the result come back through the appropriate channels. Not shown here, but I did several tests with multiple clients, topics, and subscriptions. All appears to work as intended.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Server
$ dotnet run
Content root path: /home/codesuji/projects/mqtt-server/src
Now listening on: http://localhost:1883
Application started. Press Ctrl+C to shut down.
Packet (60): Length: 60 Data: 10 3A 00 04 4D 51 54 54 04 C6 00 64 00 0F 6D 71 74 74 6A 73 5F 62 39 33 35 37 30 35 65 00 06 69 2D 64 69 65 64 00 07 63 6C 69 65 6E 74 41 00 05 75 73 65 72 31 00 05 70 61 73 73 31
PacketType: Connect
Sending: [|32uy; 2uy; 0uy; 0uy|]
State:
Clients:
0HM4UKIREI3HR Subscriptions:
Messages:
Packet (16): Length: 16 Data: 82 0E A7 0F 00 09 74 6F 70 69 63 2D 66 6F 6F 00
PacketType: Subscribe
Sending: [|144uy; 3uy; 167uy; 15uy; 0uy|]
State:
Clients:
0HM4UKIREI3HR Subscriptions: topic-foo
Messages:
Packet (29): Length: 29 Data: 30 1B 00 09 74 6F 70 69 63 2D 66 6F 6F 4D 79 20 66 69 72 73 74 20 6D 65 73 73 61 67 65
PacketType: Publish
Sending: [|144uy; 2uy; 77uy; 121uy|]
*************************************************************
matches: [|("0HM4UKIREI3HR", { Topic = "topic-foo" Qos = 0uy })|]
Sending: [|49uy; 27uy; 0uy; 9uy; 116uy; 111uy; 112uy; 105uy; 99uy; 45uy; 102uy; 111uy;
111uy; 77uy; 121uy; 32uy; 102uy; 105uy; 114uy; 115uy; 116uy; 32uy; 109uy;
101uy; 115uy; 115uy; 97uy; 103uy; 101uy|]
Packet (2): Length: 2 Data: E0 00
PacketType: Disconnect
State:
Clients:
Messages:
State:
Clients:
Messages:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Client
$ node mqtt-client.js
Connected Packet {
cmd: 'connack',
retain: false,
qos: 0,
dup: false,
length: 2,
topic: null,
payload: null,
sessionPresent: false,
returnCode: 0
}
Subscribed.
Sending: My first message
Published.
Message Received: topic-foo :: My first message
Connection closed.

At this point, I’ve completed the primary loop of MQTT functionality. Clients can connect, publish, and subscribe to my MQTT server. I’d say that’s pretty cool. This is by no means complete, there are still many smaller supporting bits to add, but this gets it to a point where it is easy to iteratively improve. Adding Subscribe and Publish weren’t particularly difficult once I had the underlying foundation in place, including types defining the system. I think this is another good example of how F# can be a valuable tool when writing servers. A solid underlying structure and a productive language can go a long way to ensure a properly functioning system. I hope you enjoyed this little adventure and found it useful.