Building an MQTT server in F# - Part 2

Read Time: 27 minutes

Today I’ll continue the process of building an MQTT server with F#. The basic networking capabilities I put together last time will be expanded upon to support basic MQTT functionality, specifically the MQTT Connect message.

In my previous post I setup a simple F# server listening on port 1883, but there was no MQTT functionality. Today it’s time to start wiring in some real functionality. Handling a Connect message seems like a logical place to start, so that’s where I’ll begin. Before getting into the code, I want to take a quick look at the MQTT protocol. I’m going to gloss over some details, but the MQTT Spec is always there for details if you want to see more.

A quick description of the client and server interaction is in order. The connection handshake is pretty basic. The client sends a connect request. Assuming all is well with the request (supported version, successful authentication, etc), the server then sends a connection ack back to the client. At this point the connection is established, and the client sends subscribe and publish messages as appropriate. The server can now send messages to the client that meets it’s subscription criteria. What do MQTT packets look like? At a high level, a packet is defined by three components.

  1. Fixed Header - A common header across all packet types. Byte 1 contains the packet’s type and standard flags. Byte 2+ contains the packet’s length, to aid in parsing.
  2. Variable Header - This is a packet-specific metadata header. As a result, the content and length of this will vary depending on the packet-type in question.
  3. Payload - Here is the actual data. As a side note, not all packet types have a payload.

As eluded to, there are different types of messages that the server supports. Each packet type having it’s own packet-specific variable header and payload structure. The ones I’ll concern myself with today are Connect (client -> server message) and ConnAck (server -> client message). What does this look like when modeling MQTT packets in F#? The structure of the Connect packet is taken from the spec, and is pretty straight forward.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/// Mqtt Packet type
type PacketType =
| Connect = 1uy
| ConnAck = 2uy
| Publish = 3uy
| PubAck = 4uy
| PubRec = 5uy
| PubRel = 6uy
| PubComp = 7uy
| Subscribe = 8uy
| SubAck = 9uy
| Unsubscribe = 10uy
| UnsubAck = 11uy
| PingReq = 12uy
| PingResp = 13uy
| Disconnect = 14uy
| Reserved = 15uy

/// Connect packet - Fixed header
type RequestConnectFixedHeader =
{ PacketType: PacketType
RemainingLength: int
Size: int }

/// Connect packet - Flags
type ConnectFlags =
{ UserName: bool
Password: bool
WillRetain: bool
WillQoS: int
WillFlag: bool
CleanSession: bool
Reserved: bool }

/// Connect packet - Variable header
type RequestConnectVariableHeader =
{ Name: string
Level: int
Flags: ConnectFlags
KeepAlive: int
Size: int }

/// Connect packet - Payload
type RequestConnectPayload =
{ ClientIdentifier: string option
WillTopic: string option
WillMessage: byte [] option
UserName: string option
Password: string option }

/// Connect packet
type RequestConnectPacket =
{ FixedHeader: RequestConnectFixedHeader
VariableHeader: RequestConnectVariableHeader
Payload: RequestConnectPayload }

/// Request packet
type RequestPacket =
| Connect of RequestConnectPacket
| Empty

Now that the Connect packet is defined, it’s time to parse the byte stream. I handle this in two steps. First, determine what the packet type is by peeking at the first byte. Second, I pass the stream to the appropriate parser based on the type. The packet type is defined in the top half of the first byte of the MQTT packet. So I need to bitshift and then match to determine the packet type. Once I have that I parse a packet. Successful parsing results in a RequestPacket option, otherwise I just return None. The option type allows the calling control flow to easily process data based on parsing results.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/// Given a byte (first byte from fixed header), determine the packet type
let getPacketType (headerByte: byte) :PacketType =
match headerByte >>> 4 with
| 1uy -> PacketType.Connect
| 2uy -> PacketType.ConnAck
| 3uy -> PacketType.Publish
| 4uy -> PacketType.PubAck
| 5uy -> PacketType.PubRec
| 6uy -> PacketType.PubRel
| 7uy -> PacketType.PubComp
| 8uy -> PacketType.Subscribe
| 9uy -> PacketType.SubAck
| 10uy -> PacketType.Unsubscribe
| 11uy -> PacketType.UnsubAck
| 12uy -> PacketType.PingReq
| 13uy -> PacketType.PingResp
| 14uy -> PacketType.Disconnect
| 15uy -> PacketType.Reserved
| _ -> PacketType.Reserved

/// Extract a packet from the buffer
let parseBufferSegment (buffer: ReadOnlyMemory<byte>) :RequestPacket option =
try
let buffer' = buffer.Span
let packetType = getPacketType buffer'.[0]

match packetType with
| PacketType.Connect -> parseRequestConnectPacket buffer'
| _ -> printfn "ERROR: Unsupported packet type: %A" packetType
None
with
| :? Exception as ex -> printfn "ERROR: Parsing packet: %s" ex.Message
None

There are a couple ways I could’ve handled the parsing. My approach is to leverage the Span provided to the parsing function. This means I need to track current position as I parse byte by byte through the packet. There is also a common pattern I’ll follow. It is often, but not always, the case that the variable header parsing will depending on values in the fixed header, and the payload will depend on both headers. I’ll use a pattern of always passing in the previously parsed headers as well as current index, in case the current parsing step requires data. This means sometimes passing things I don’t need, but it reduces mental overhead to just always pass along all headers. This is something that often gets overlooked in design. Unless you need to squeeze every ounce of performance out of a system, designing for easing mental load is an important goal to strive for.

1
2
3
4
5
6
7
8
9
10
11
/// Parse buffer for a Connect packet
let parseRequestConnectPacket (buffer: ReadOnlySpan<byte>) :RequestPacket option =
let position = 0
let (fixedHeader, position) = parseRequestConnectFixedHeader buffer
let (variableHeader, position) = parseRequestConnectVariableHeader buffer fixedHeader position
let (payload, position) = parseRequestConnectPayload buffer fixedHeader variableHeader position

Some(RequestPacket.Connect
{ RequestConnectPacket.FixedHeader = fixedHeader
VariableHeader = variableHeader
Payload = payload })

This has been mostly control flow until now. Now it’s time to dig into the parsing. First is the fixed header. Based on the spec it is at least two bytes. The first byte contains the packet type, and no flags. The rest of the byte(s) are the remaining length of the packet. To accommodate variable length packets (and save space) the length is encoded in a variable length field. When in F# I often find it tempting to rewrite algorithms in a more functional style. But there also isn’t a reason to make it harder for myself than I need to. Direct translation of the provided spec is the easiest path forward. Things like mutuable are a code smell. In this case it’s a reasonable trade-off to verify that I’m matching the spec’s algorithm.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/// Extract the packet's remaining length for the fixed header
/// start - Start of length
/// Returns: length, index immediately following length
let getPacketRemainingLength (buffer: ReadOnlySpan<byte>) (start: int) :(int * int) =
let mutable position = start
let mutable multiplier = 1
let mutable value = 0
let mutable keepGoing = true
while keepGoing do
let encodedByte = int buffer.[position]
value <- value + (encodedByte &&& 127) * multiplier
multiplier <- multiplier * 128
if (multiplier > 128 * 128 * 128) then raise (Exception "Malformed remaining length")
keepGoing <- encodedByte &&& 128 <> 0
position <- position + 1

(value, position)

(* Spec reference
multiplier = 1
value = 0
do
encodedByte = 'next byte from stream'
value += (encodedByte AND 127) * multiplier
multiplier *= 128
if (multiplier > 128*128*128)
throw Error(Malformed Remaining Length)
while ((encodedByte AND 128) != 0) *)


/// Extract fixed header (Connect) from buffer
let parseRequestConnectFixedHeader (buffer: ReadOnlySpan<byte>) :(RequestConnectFixedHeader * int) =
let position = 0
let packetType = getPacketType buffer.[position]
let (remainingLength, position') = getPacketRemainingLength buffer (position + 1)

({ RequestConnectFixedHeader.PacketType = packetType
RemainingLength = remainingLength
Size = position' },
position')

The variable header contains a bit more parsing to deal with. Specifically it contains four fields: Protocol Name, Level, Flags, and KeepAlive. These are mostly boolean flags that I extract with bitshifts and ANDing. Level is an int for supported MQTT version. The Name field is a string. The MQTT protocol follows a common pattern when storing strings; a two byte length field followed by the UTF8 encoded string of the specified length. This shows up a lot throughout the parsing process.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/// Extract variable header (Connect) from buffer
let parseRequestConnectVariableHeader (buffer: ReadOnlySpan<byte>)
(fixedHeader: RequestConnectFixedHeader)
(position: int)
: (RequestConnectVariableHeader * int) =
let startPosition = position

// name
let nameLength = twoBytesToInt (buffer.Slice(position, 2))
let name = Encoding.UTF8.GetString(buffer.Slice(position + 2, nameLength))
let position = position + 2 + nameLength

// level
let level = int buffer.[position]
let position = position + 1

// flags
let flags = buffer.[position]
let position = position + 1

let (userName, password, willRetain, willQoS, willFlag, cleanSession, reserved) =
(flags &&& 128uy <> 0uy,
flags &&& 64uy <> 0uy,
flags &&& 32uy <> 0uy,
flags &&& 24uy >>> 3,
flags &&& 4uy <> 0uy,
flags &&& 2uy <> 0uy,
flags &&& 1uy <> 0uy)

let connectFlags =
{ ConnectFlags.UserName = userName
Password = password
WillRetain = willRetain
WillQoS = int willQoS
WillFlag = willFlag
CleanSession = cleanSession
Reserved = reserved }

// Keepalive
let keepAlive = twoBytesToInt (buffer.Slice(position, 2))
let position = position + 2

({ RequestConnectVariableHeader.Name = name
Level = level
Flags = connectFlags
KeepAlive = keepAlive
Size = position - startPosition },
position)

Finally, the payload. For a connect packet, what is in the payload is determined by the flags in the variable header. So here is mostly conditional extraction of values, which means most of the payload values are Option types. As an example, if the UserName flag was set in the header, then a UserName is expected in the payload.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/// Extract a byte array from a header or payload.
/// Requires the data is in the format: length (2bytes) value (length bytes)
/// Example: [ 0 5 97 98 99 100 101 ]
let extractBytes (buffer: ReadOnlySpan<byte>) (position: int): (byte [] * int) =
let length = twoBytesToInt (buffer.Slice(position, 2))
let value = buffer.Slice(position + 2, length).ToArray()

let position = position + 2 + length
(value, position)

/// Extract bytes if the flag is true
let extractConditionalBytes (buffer: ReadOnlySpan<byte>) (position: int) (flag: bool): (byte [] option * int) =
if flag then
let (value, position) = extractBytes buffer position
(Some value, position)
else
(None, position)

/// Extract payload (Connect) from buffer
let parseRequestConnectPayload (buffer: ReadOnlySpan<byte>)
(fixedHeader: RequestConnectFixedHeader)
(variableHeader: RequestConnectVariableHeader)
(position: int)
: (RequestConnectPayload * int) =
// Client Identifier
let (clientIdentifier, position) = extractString buffer position

// Get optional data from client if provided
let (willTopic, position) = extractConditionalString buffer position variableHeader.Flags.WillFlag
let (willMessage, position) = extractConditionalBytes buffer position variableHeader.Flags.WillFlag
let (userName, position) = extractConditionalString buffer position variableHeader.Flags.UserName
let (password, position) = extractConditionalString buffer position variableHeader.Flags.Password

({ ClientIdentifier = Some clientIdentifier
WillTopic = willTopic
WillMessage = willMessage
UserName = userName
Password = password },
position)

Once a Connect packet is parsed, it needs to be processed. Like the parsing code, right now the example only includes Connect. This will be expanded over future posts. The control flow is again just matching against type. Per the spec, I disconnect from the client if I get invalid data. This could either be due to failed parsing, or just an unsupported type.

1
2
3
4
5
6
7
8
9
10
11
/// Process an mqtt request packet
let processPacket (state: StateManager) (context: ConnectionContext) (packet: RequestPacket option) =
match packet with
| Some(RequestPacket.Connect (p)) -> processConnect state context p
| None ->
printfn "ERROR: Invalid packet data. Disconnecting..."
processDisconnect state context

/// Disconnect client
let disconnectClient (state: StateManager) (context: ConnectionContext) =
processDisconnect state context

Before I can fully process a Connect packet, I need to provide some supporting functions and types. Specifically I need to define a response packet for the client in the way of ConnAck. Like the request packets, it is compromised of a fixed header and variable header (ConnAck doesn’t have a payload, but many response packets do). Unlike request packets, I need to handle serialization for when sending to the client. This is easy enough to accomplish with a Serialize function attached to the type. This a direct serialization to bytes. Serialization can get a little hairy, but really at this point the toughest parts to deal with have been the byte parsing and the serialization. Thankfully everything else has been the “easy stuff”.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/// ConnAck response packet - Fixed header
type ConnAckFixedHeader =
{ PacketType: PacketType
RemainingLength: int }
member __.Serialize() =
[| [| byte __.PacketType <<< 4 |]
(encodePacketRemainingLength __.RemainingLength) |]
|> Array.concat

/// ConnAck flags
type ConnAckFlags =
{ SessionPresent: bool }
member __.Serialize() =
0uy &&& if __.SessionPresent then 1uy else 0uy

/// ConnAck response packet - Variable header
type ConnAckVariableHeader =
{ Flags: ConnAckFlags
ReturnCode: ConnAckReturnCode }
member __.Serialize() =
[| __.Flags.Serialize()
byte __.ReturnCode |]

/// ConnAck response packet
type ConnAckResponsePacket =
{ FixedHeader: ConnAckFixedHeader
VariableHeader: ConnAckVariableHeader }
interface IResponsePacket with
member __.Serialize() =
[| __.FixedHeader.Serialize()
__.VariableHeader.Serialize() |]
|> Array.concat

With that out of the way, I can look at the Connect packet processing. The connection handshake is straight-forward. I can perform client authentication and prepare the ConnAck packet. Once complete, the final steps are sending the response back to the client and updating system state. That is handled with the state.Post call. I’m going to hand wave over this part for now since the underlying implementation involved. Just trust me for now that the serialized packet will get back to the client.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/// Process a Connection packet
let processConnect (state: StateManager) (context: ConnectionContext) (packet: RequestConnectPacket): ProcessResult option =
let permitConnect =
if requireAuthentication
then authenticate packet.Payload.UserName packet.Payload.Password
else true

if permitConnect then
let variableHeader =
{ ConnAckVariableHeader.Flags = { ConnAckFlags.SessionPresent = true }
ReturnCode = ConnAckReturnCode.Accepted }

let variableHeader' = variableHeader.Serialize()

let fixedHeader =
{ ConnAckFixedHeader.PacketType = PacketType.ConnAck
RemainingLength = variableHeader'.Length }

let connAckPacket =
{ ConnAckResponsePacket.FixedHeader = fixedHeader
VariableHeader = variableHeader }

// Connected
let message = serializeResponsePacket connAckPacket
state.Post(StateMessage.Connect(context, message))

None
else
Some ProcessResult.Disconnect

At this point I’ve covered parsing of a Connect packet as well as crafting a response packet. After working with the byte-level details, I need to tie things all together. There is a small detail I glossed over when discussing the original handler from the last post. There are a couple dynamics regarding packets and the provided buffer data. MQTT is often used for lots of short messages, but it is entirely within the spec that they could also be longer than the Kestrel-provided segment length. So although Kestrel handles a lot of the networking dynamics I’m not completely absolved of dealing with at least some of it. I need a way to deal with multiple packets in a memory segment or a packet being split across segments. There are are couple ways to handle this. My preference is to keep the parsing code as clean as possible, which means I don’t want to add the complexity of multiple segments there. My approach is to write my own segment buffer abstraction, called SegmentBuffer. This will store multiple segments as the arrive and I’ll write my own Item, Slice, and Length methods to hit the common use-cases. This abstraction incurs a small performance cost, but makes the parsing so much easier to process.

With that out of the way I need to rework the handler by removing the echo placeholder. I modify the handler to aggregate the buffer to my abstraction. Once in a manageable structure, I can do a quick peek at the next packet. Now I can grab packet-length blocks from the buffer and pass them to the parser. Doing it this way makes the parsing easier. Considering parsing is often the messy part, it pays to simplify it as much as possible. Once parsed, I pass the packet to be processed. This is done as long as their are complete packets in the SegmentBuffer to be processed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/// Mqtt handler - using segmentbuffer
override __.OnConnectedAsync(connection: ConnectionContext) =
printfn "Client connected"
let segmentBuffer = SegmentBuffer()

async {
try
let mutable clientConnected = true
while clientConnected do
let! result = Async.AwaitTask(connection.Transport.Input.ReadAsync().AsTask())

if (result.IsCanceled || result.IsCompleted) then
clientConnected <- false
else
// Append all segments from the client buffer to the larger segmentbuffer
let buffer = result.Buffer
let mutable iter = buffer.GetEnumerator()
let mutable continueClientBuffer = true

while continueClientBuffer do
let segment = iter.Current
segmentBuffer.Append segment
continueClientBuffer <- iter.MoveNext()

connection.Transport.Input.AdvanceTo(buffer.End)

// Process the segmentBuffer
let mutable continueSegmentBuffer = true
while continueSegmentBuffer do
let peekResults = peekPacketLength segmentBuffer
match peekResults with
| Some (packetLength) ->
// Parse and process packet
let result =
packetLength
|> segmentBuffer.ConsumeSlice
|> tap printPacket
|> parseBufferSegment
|> processPacket state connection

match result with
| Some (MqttType.ProcessResult.Disconnect) -> continueSegmentBuffer <- false
| _ -> ()
| None -> continueSegmentBuffer <- false // Don't have entire packet

// Cleanup client connection
disconnectClient state connection |> ignore
with
| ex -> printfn "ERROR: %A" ex
}
|> Async.StartAsTask :> Task

In the echo server example client communication was simple; read from client, write to client. As expected, MQTT is more involved. The server must maintain the internal state to support multiple clients, their topic subscriptions, as well as the ability to publish messages to clients based on their subscriptions. For this I’ll use a central control function to manage these interactions, which I’ll call a StateManager. It’s interface will be supplied by a MailboxProcessor. This provides me asyncronous processing capability without having to worry about data or lock contention. From the main control interface, I’ll create a function for each supported MQTT message type. This allows me to centralize all state management as well as handling proper message sending to all clients. Taking a deeper look at state management requirements, I need a way to track connected clients and their subscriptions. I also need to track active messages (messages that are published, and possibly retained). This domain modeling is simple enough to do with F# records. Additionally, I have moved the context.Transport.Output.WriteAsync() functionality into the state manager, specifically a writeMessage function. I typically like to keep the read and write to clients together in the main handler, but it just isn’t a reasonable option in this case. Interacting with multiple clients based on a single client’s message requires this to be buried a layer deeper in the code. But at least I can keep all client interactions in a single place. This ends up being an overall win, and the code is still reasonably clean.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
module MqttState

open Microsoft.AspNetCore.Connections
open MqttType

/// Connected client metadata
type ConnectedClient =
{ ConnectionId: string
Context: ConnectionContext
Subscriptions: TopicFilter array }

/// Client reference object
type ActiveMessageClient =
{ ClientId: string;
Qos: byte }

/// Cached messages
type ActiveMessage =
{ Message: byte []
Retained: bool
Clients: ActiveMessageClient list }

/// MQTT server state data, including connected clients and message
type MqttState =
{ ConnectedClients: Map<string, ConnectedClient>
ActiveMessages: ActiveMessage list }

/// Initial MQTT server state
let emptyMqttState =
{ MqttState.ConnectedClients = Map.empty
ActiveMessages = [] }

/// Supported messages that update server state
type StateMessage =
| Connect of (ConnectionContext * byte [])
| Disconnect of ConnectionContext

/// MQTT Server state object
type StateManager = MailboxProcessor<StateMessage>

/// Write message to specific client context
let writeMessage (connection: ConnectionContext) (message: byte []) =
async {
printfn "Sending: %A" (message)
let message' = System.ReadOnlyMemory<byte>(message)
let! result = Async.AwaitTask(connection.Transport.Output.WriteAsync(message').AsTask())

return
if result.IsCanceled then false // Operation cancelled
elif result.IsCompleted then false // Client connection closed
else true // Completed successfully
}

/// Interface to state manager
let stateManager () =
fun (inbox: StateManager) ->
let rec loop s =
async {
let! (msg: StateMessage) = inbox.Receive()

let! s' =
match msg with
| Connect (connection, message) -> stateConnect s connection message
| Disconnect (connection) -> stateDisconnect s connection
// printfn "state: %A" s'
showState s'

return! loop s'
}

loop emptyMqttState

Now that the supporting structure is implemented, I can directly address how I handle connects. When a Connect message is processed, it is ultimately ends up in the stateConnect function of the StateManager (remember the hand-waving from earlier?). The server needs to keep track of clients. so a ConnectedClient record is created and added to the client tracking map. It also sends a ConnAck message back to client, letting them know they are connected. This completes the connection handshake. I’m including the client Disconnect processing as well. This is partially because it is a nice symmetry with Connect, but also it gets woven throughout the code to handle bad messages (through parsing and processing). The typical response to bad things in MQTT is just disconnect the client, so it is just natural to include it immediately. Fwiw, there are specifically client Disconnect message as well. So, here the opposite of Connect happens, remove the client from the ConnectedClients map.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/// Handle client connect
let stateConnect (state: MqttState) (connection: ConnectionContext) (message: byte []) =
async {
let! success = writeMessage connection message

// Update state
let client =
{ ConnectedClient.ConnectionId = connection.ConnectionId
Context = connection
Subscriptions = [||] }

let client' = Map.add connection.ConnectionId client state.ConnectedClients

return { state with ConnectedClients = client' }
}

/// Handle client disconnect
let stateDisconnect (state: MqttState) (connection: ConnectionContext) =
let clientId = connection.ConnectionId
async {
// Kill client connection
connection.Abort()

let clients = Map.remove clientId state.ConnectedClients

let messages =
state.ActiveMessages
|> List.map (fun m ->
let c' =
m.Clients
|> List.filter (fun c -> c.ClientId <> clientId)

{ m with Clients = c' })
|> List.filter (fun m -> m.Clients.Length <> 0)

return { state with
ConnectedClients = clients
ActiveMessages = messages }
}


Once the state manager is fully defined, it gets added to the MqttConnectionHandler class. You may have noticed earlier, it gets passed into the packet processing function. This singleton is what allows me to have a shared server state that can be easily managed.

1
2
/// mqtt server state
static let mutable state = MailboxProcessor.Start(stateManager ())

At this point the server can handle a Connect message from the client. This includes adding the client to it’s internal state handling as well as responding with the proper ConnAck message. Now is a good time to see this in action. From the server side, I can see the client network connection as well as reception and consumption of it’s MQTT Connect packet. Upon proper connection it sends a ConnAck back to the client. From the client side, it sends the Connect and receives a ConnAck. Great! This is the short-term goal, and I’d say mission accomplished. This is also a good time to point out that my test client is prepped for a fully functional interaction, so after connection it attempts to subscribe to a topic. Obviously this hasn’t been implemented yet, so the server catches the unsupported packet error and disconnects the client. I guess I know what I need to work on next.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Server
$ dotnet run
Content root path: /home/codesuji/projects/mqtt-server/src
Now listening on: http://localhost:1883
Application started. Press Ctrl+C to shut down.
Client connected
PacketLength: Some 60
Packet (60): Length: 60 Data: 10 3A 00 04 4D 51 54 54 04 C6 00 64 00 0F 6D 71 74 74 6A 73 5F 64 37 39 66 66 66 31 65 00 06 69 2D 64 69 65 64 00 07 63 6C 69 65 6E 74 41 00 05 75 73 65 72 31 00 05 70 61 73 73 31
PacketType: Connect
Sending: [|32uy; 2uy; 0uy; 0uy|]
State:
Clients:
0HM4JKJV9HHNF Subscriptions:
Messages:
PacketLength: Some 16
Packet (16): Length: 16 Data: 82 0E 1D D7 00 09 74 6F 70 69 63 2D 66 6F 6F 00
PacketType: Subscribe
ERROR: Unsupported packet type: Subscribe
ERROR: Invalid packet data. Disconnecting...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Client 
$ node mqttclient.js
Connected Packet {
cmd: 'connack',
retain: false,
qos: 0,
dup: false,
length: 2,
topic: null,
payload: null,
sessionPresent: false,
returnCode: 0
}
Connection closed.

At this point I have the server supporting very basic MQTT functionality, specifically Connect messages. As is often the case, the beginning of a project often includes a lot of setup and yak shaving before I can get to the ultimate, or even preliminary, goal. But this process is also informative. It is a view into how system state and basic MQTT types can be modeled in F#. I’m probably a little biased, but F#’s ability to model proper state and structure is a definite asset, and makes refactoring through this process a breeze. With my goal met, this is a good stopping point. I hope you found this post informative. Next time I’ll venture further into the MQTT spec and tackle Subcribe and Publish.