all Technical posts

F# Agent Pipeline Processing in Concurrent Applications

How can we use the F# Agent to set up a pipeline which can be used in Concurrent Applications? We need not only a working version but also a declarative version which we can easily extend and maintain.

Introduction

In the context of Concurrent Applications, there are several architectural systems that describe the communication of the system. The Actor Model or an Agent-Based Architecture, is one of those systems we can use to write robust Concurrent Applications.

One of the challenges in Concurrent Programming is that several computations want to communicate with each other and share a state safely.

F# has implemented this model (or at least a part of it) with the Mailbox Processor or shorted version: Agent. See Thomas Petricek’s blog post for more information.

Agent Pipeline

Introduction

The way agents can communicate with each other is by itself a lot harder to comprehend than in a sequential or even a parallel system. Each agent can communicate with each other by sending messages. The agent itself can alter its Private State, but no other “external” state, they communicate the state by sending messages to other agents.

Agent -example -basic

This Isolation of the agents in Active Objects can increase the system complexity. In this post, I will talk about some brain-dump exercise of mine to create an Agent Pipeline. I think the way we think of a pipeline is a rather easier approach to think about agents because we still have that feeling of “sequential”.

 Agent -example -pipeline

In F# this is rather easy to express, so let’s try it!

F# Agent Template

First, let’s define the agent we are going to use. Our agent must receive a message, but must also send some kind of response, so the next agent can process it. The agent itself is an asynchronous computation, so the result of the message will be wrapped inside an Async type.

Our signature of calling our pipeline agent should thereby be something like this:

‘a -> Async<‘a>

Ok, lets define some basic prerequisites and our basic agent template. We’ll use a basic string as our message just for example purposes.

Now that we have our basic setup, we can start looking at the body of our agent. We must receive a message and send back a reply. This can be done with the basic method Receive() which will return a tuple with the message itself and the channel we have to send our reply to. This call to “receive” will block the loop till the next message arrives. Agents run on a single logical thread and all messages send to agents are queued.

The body can be defined like this (print function to simulate the processing):

F# Async Binding

Ok, now we have our basic agent; we can look at how we can bind agents together.

Just like the previous diagram; I would like to express in code how messages are “piped” to other agents. When we think of piping, we can think of two approaches: Applicative (<*>) and Monadic (>>=). Since we need the result of the previous call in our next call, I’m going to use the Monadic style (>>=).

We see by looking at the signature that we must bind two separate worlds: the world of strings and the world of Async. Only looking at the signature makes me want to write some bind functions; so first, before we go any further; lets define some helper functions for our Async world:

These two functions should be enough to define our pipeline. Look at the signature of our bind:

(‘a -> Async<’b>) -> Async<’a> -> Async<’b>

This is just what we want in our agent signature. Now, I’m going to create some agents to simulate a pipeline:

Note that the pipeline of the agents is almost exactly like we have designed in our diagram. This is one the many reasons that I like F# so much. Much more than C#, you can express declaratively exact how you see the problem. C# Async/await variant is inspired by the F# Asynchronous Workflows.

Agent -example -pipeline -monadic

Or if you like a Klesli style (which I like to use sometimes). This will make sure that we don’t have to return the message as an Async:

Conclusion

“Functional programming is the most practical way to write concurrent programs. Trying to write concurrent programs in imperative languages isn’t only difficult, it leads to bugs that are difficult to discover, reproduce, and fix”

– Riccardo Terrell (Functional Concurrency)

This is just a brain-dump of some exercise for myself in training Monadic Binds and Agents, and how to combine them. What I really learned how to do, is looking at the signature itself. Much more than in Object-Oriented languages the signature isn’t a lie and tells exactly how it’s done. Only by looking at the signature, you can make a guess of how the function will look like.

Functional Programming is still a bit strange at first if you come from an Object-Oriented world; but trust me; it’s worth the learn. In a future where Asynchronous, Parallelism and Concurrent topics are considered “mainstream”, Functional Programming will be even more become a lesser and lesser mainstream language.

Subscribe to our RSS feed

Hi there,
how can we help?

Got a project in mind?

Connect with us

Let's talk

Let's talk

Thanks, we'll be in touch soon!

Call us

Thanks, we've sent the link to your inbox

Invalid email address

Submit

Your download should start shortly!

Stay in Touch - Subscribe to Our Newsletter

Keep up to date with industry trends, events and the latest customer stories

Invalid email address

Submit

Great you’re on the list!