The Power of the Forward Pipe

Read Time: 8 minutes

… or Why I <3 the |>

This is a light post, but I wanted to provide some advocacy for the |> (forward pipe). To F# developers, it is a well-known operator. It hardly needs introduction or promotion for heavier use. With that said, it can be underappreciated that F# provides native tooling for easy data processing development models. If F# is the ETL language, then |> is the glue that holds all of the processing components together.

When talking to people about F#, a recurring theme is readability. It is worth leveraging this strength. Using |> to implement data transformations and processing is an intuitive way to enhance readability. It encourages a natural reading of the flow of data through the application. It can be simply described as: the data flows from top to bottom, being pushed through the pipes as directed. F#’s indentation aids in calling out sub-processing when necessary. Admittedly, this concept isn’t unique to F#. But the contructs of the language make this a natural approach. Most languages would be hard pressed to match such a clean and readable syntax for such common, and sometimes involved, tasks. In the end, it is not just about accomplishing a goal, but ensuring that future developers can easily decipher the intent. I believe |> assists in that result. With all that said, it is time to show some code.

First, I’ll make an arbitrary dataset. To keep things simple, it is a sequence of tuples (x, y), where x is random values between 0 and 10, and y is cosine(x). See, I’m using it already :) It truly is a workhorse of pushing data, but this is hardly exciting.

1
2
3
4
5
6
7
open System
let rows =
let r = new System.Random()
[0 .. 10000]
|> Seq.map (fun _ ->
let x = r.NextDouble() * 10.
(x, Math.Cos(x)))

Any good ETL process needs some transformation and filtering functions. In a real program, these would be the main blocks of business logic. For here, filtering will only include evens, and transformations will be y' = (y + x) + 2. I also use this as an opportunity to use composition to chain transformations. F#’s function composition operator shares a linguistic theme, the >> shows an underlying movement of data. I realize function composition isn’t typically described in such terms, but ultimately a similar intent is being accomplished. Additionally I make a functional version of floor because, to my functional sensiblities, it feels cleaner.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
let floor (x:float) = Math.Floor(x)

let isEven (x:float) = (floor x) % 2. = 0.
let filterRow (x:float, y:float) = isEven y

let xPlusY (x:float, y:float) = (x, y + x)
let yPlus2 (x:float, y:float) = (x, y + 2.)
let transformRow = (xPlusY >> yPlus2)

let averageYs (x:float, items:seq<float*float>) =
let avg =
items
|> Seq.averageBy (fun (_, y) -> y)
(x,avg)

Now it is time to process the dataset. I use a series of filtering, transforms, and grouping. This is a simple example, but what I enjoy is how clean it is to reason about. The data flows from top to bottom. filter and map perform row-level processing while data-set level grouping is just as easy. The syntax is light enough to get out of the way, but descriptive enough to be instructive.

1
2
3
4
5
6
7
8
9
10
11
let transformedRows = 
rows
// Filter
|> Seq.filter filterRow
// Transform
|> Seq.map transformRow
// Dataset-level grouping
|> Seq.groupBy (fun (x, _) -> floor x)
|> Seq.sortBy (fun (x, _) -> x)
// Results (x, avg(y))
|> Seq.map averageYs

Since it is always helpful to see the data, here is what my test set and results look like. Have I mentioned how useful F#’s repl is when doing data processing?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
> rows
- ;;
val it : seq<float * float> =
seq
[(6.769300023, 0.88415471); (2.243308668, -0.6229532528);
(0.623326558, 0.8119411099); (9.159116824, -0.9649191321); ...]

> transformedRows |> Seq.iter (printfn "%A")
- ;;
(0.0, 3.34397848)
(1.0, 3.563359213)
(4.0, 6.996100278)
(5.0, 8.178838086)
(6.0, 9.440792772)
(7.0, 9.827865885)

This is all well and good, not even really that exciting (is cleaniness anticlimatic?). But I can take this a little further. Let’s say I want to introduce parallelism, but don’t want to sacrifice readablity. If I use arrays instead, I can use Array.Parallel.map. I get parallelism with the smallest of modifications.

1
2
3
4
5
6
7
8
9
10
11
12
let rowsArray = Seq.toArray rows
let transformedRowsArray =
rowsArray
// Filter
|> Array.filter filterRow
// Transform
|> Array.Parallel.map transformRow
// Dataset-level grouping
|> Array.groupBy (fun (x, _) -> floor x)
|> Array.sortBy (fun (x, _) -> x)
// Results (x, avg(y))
|> Array.Parallel.map averageYs

It is nice to have options; here is another one, ParallelSeq. This requires a package, but its a pretty simple inclusion. Again, parallelism with minimal changes. Do you sense a theme yet?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
paket add FSharp.Collections.ParallelSeq

#r "./packages/FSharp.Collections.ParallelSeq/lib/net45/FSharp.Collections.ParallelSeq.dll"
open FSharp.Collections.ParallelSeq

let transformedRowsArray2 =
rowsArray
// Filter
|> PSeq.filter filterRow
// Transform
|> PSeq.map transformRow
// Dataset-level grouping
|> PSeq.groupBy (fun (x, _) -> floor x)
|> PSeq.sortBy (fun (x, _) -> x)
// Results (x, avg(y))
|> PSeq.map averageYs

Oh, but there is more. Conveniently async is provided natively as another way to provide parallelism. This takes a couple extra steps, but it is worth the effort. First, deciding what to parallelize. For sake of this post, filtering and row transformation will be done in parallel. Second, make an async transform function. This requires a minor adaptation of the code. Extract the part of the process, and wrap it in an async { ... }. Well, that was easy.

1
2
3
4
5
6
let transformRowsAsync (rows) = async {
return
rows
|> Seq.filter (fun (x, _) -> isEven x)
|> Seq.map transformRow
}

Once that is in place, the data needs to be broken into segments that can be run in parallel. Luckily Seq.chunkBySize does exactly what I need. It converts a sequence into a sequence of sequences (of equal length, so the load is balanced). I then feed these sub-sequences into Async.Parallel, and wait for them to complete. Once they are done processing, concat them back into a single sequence for dataset-level processing. As you can imagine, expensive transforms would benefit from this parallelism. And the dataflow hardly changes. It’s still easy to read and reason about.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
let transformedRowsAsync = 
rows
// Split into subsets
|> Seq.chunkBySize 10
|> Seq.map (fun subList -> transformRowsAsync subList)
// Parallel processinng
|> Async.Parallel
|> Async.RunSynchronously
// Put back together
|> Seq.concat
// Do set-level processing
|> Seq.groupBy (fun (x, _) -> floor x)
|> Seq.sortBy (fun (x, _) -> x)
// Results (x, avg(y))
|> Seq.map averageYs

These are just some of the native ways data flows can be represented. To work with real data, pair this with F#’s Data Type Providers, and this can be a strong, but simple paradigm for data processing. I think this can take most people a long way. But what if you need more. Then you can take the extra step to use frameworks like MBrace, Akka.NET, and Hopac. Hopefully this provides a bit of inspiration to embrace the |> and leverage the power of F# for ETL in the future. Until next year…