This post is the first of a series about how I wrote in JavaScript the equivalent of Go(lang) channels.
I did this mainly for fun, I really like Go's concurrency model, and as a daily JS developer I thought it was a good challenge to recreate it in JS.
Whether you already know Go's concurrency model or not isn't important, I'll show very simple examples along the way, and it's a good opportunity to learn a different paradigm from Promise
and async
/await
.
Furthermore, I'm making an extensive use of JS's generator functions, and some other funny things such as WeakMap
or async iterables (with for await ... of
), so you might also learn a few things about JS!
Now let's start with a short presentation of Go channels and their basic usage.
Go channels
Go has a really simple way of starting new threads (these are very lightweight threads called goroutine) with the go
keyword:
func main() {
go func1()
...
}
func func1() { ... }
In the above example, func1()
will start in a new goroutine and execute concurrently with main()
.
The equivalent in JS would be calling an async function without using await
:
async function main() {
func1()
...
}
async function func1() { ... }
When several functions execute concurrently in JS, it is safe to share some memory between these.
This is one of the nice properties of JS's event loop, as a developer you don't have to ensure that you're accessing memory in a safe way, it is always the case!
This differs from Go where, if using shared memory, you must protect it using mutex to ensure that only one goroutine at a time is accessing some variable(s).
However in Go you also have the possibility to avoid the use of shared memory, and that's where channels come in handy:
func main() {
ch := make(chan int) // Create an integer channel
go send123(ch) // Start send123() in a new goroutine
// Receive an integer from ch and print it to stdout 3 times
fmt.Println(<-ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
}
func send123(ch chan int) {
// Send 3 integers to ch
ch <- 1
ch <- 2
ch <- 3
}
In the above example, main()
creates an integer channel, starts send123()
in a new goroutine, and then receives an integer from the channel and prints it to standard output 3 times.
send123()
sends 3 integers to the channel. On each side, the channel operations are blocking (main()
will block until send123()
sends an integer, and vice versa).
As you can see channels are pretty simple to use, and allow to avoid sharing memory between main()
and send123()
except for the channel reference.
So basically I've just said earlier that channels are useless in JS 🤪! They resolve the problem of shared memory which doesn't exist in JS...
But! There's more to channels. They offer a different way of programming concurrent applications, which might feel more natural to a lot of people.
They also come with some interesting concurrency patterns such as:
- Pipeline
- Fan in
- Fan out
- Timeout
Finally, I already said this but I'm going to repeat myself, I did this mainly for fun!
Now let's start exploring how we might implement channels in JS...
Implementing channels in JS
We now have a rough idea of what Go channels are and how they are used, but how could we implement the same thing in JS?
We could use async/await
in some different ways:
- Use a plain object to hold the state of a channel, and define
send()
andrecv()
async functions which would take a channel's state as first argument - Define a
Channel
prototype or ES6 class which would hold the state of a channel and havesend()
andrecv()
async methods
Or we could use generator functions and the yield
keyword to hide the magic away: channels would be just references, and we would yield
receive and send operations on these references.
I won't really discuss the pros and cons of using one solution or another here, I chose to use generator functions for the following reasons:
- Generator functions (and the
yield
keyword) will allow us to implement channels with a way of working closer to that of Go channels - A friend and I have been developing a generator functions runner framework called Cuillere (🥄 spoon in french) and I love using it 😁
So now that we I have decided to use generator functions, we can imagine what the final result would look like.
Let's transpose our Go example with send123()
:
function* main() {
const ch = yield chan() // Create a channel
yield fork(send123, ch) // Start send123()
// Receive a value from ch and log it to console 3 times
console.log(yield recv(ch))
console.log(yield recv(ch))
console.log(yield recv(ch))
}
function* send123(ch) {
// Send 3 integers to ch
yield send(ch, 1)
yield send(ch, 2)
yield send(ch, 3)
}
Pretty cool! And using the yield
keyword we are going to make sure that recv()
and send()
operations are blocking, just like in Go! (main()
blocks until send123()
sends an integer, and vice versa).
There are two additional operations:
-
chan()
creates a new channel -
fork()
starts the execution of a generator function without waiting for it to complete (this one might look familiar if you are a redux-saga user)
Great! We have defined what we want, our goal now is to implement all the hidden mechanics which will allow this code to work.
But before diving into this, let me give you a short presentation of what cuillere is and how it is going to help us.
🥄 Cuillere! (queeyeah!)
CuillereJS is an extensible asynchronous execution framework based on generator functions.
So you are probably wondering why we created this Cuillere thing in the first place...
Well the goal of Cuillere is to abstract some inevitable technical complexity in plugins, and keep business code as simple and focused as possible.
At the time, we were working on a NodeJS/GraphQL backend API with a PostgreSQL database, and in order to ensure the integrity of our data we had to manage transactions properly.
I won't get into the details here, but managing transactions soon became a real pain for two reasons:
- using a callback function every time we had to ensure a transaction was opened
- passing the "transaction aware" PostgreSQL client everywhere we had to perform a database operation
Our code had become overly complex just because of transactions management... It was hard to read and often buggy!
That is when we created Cuillere, which allowed us to strip our business code from all the complexity of transactions management.
Here is a simple example of how cuillere is used with PostgreSQL:
const cuillere = require('@cuillere/core')
const {
poolMiddleware, transactionMiddleware, queryMiddleware
} = require('@cuillere/postgres')
const cllr = cuillere(
poolMiddleware({ /* postgres config */ }), // Manages connection pool
transactionMiddleware(), // Manages transactions
queryMiddleware() // Executes queries
)
const addUserAddress = (userId, address, setDefault) => cllr.call(function*() {
const res = yield query({
text: `INSERT INTO addresses (userId, street, postalcode, city)
VALUES ($1, $2, $3, $4)
RETURNING *`,
values: [userId, address.street, address.postalCode, address.city]
})
if (setDefault) {
const addressId = res.rows[0].id
yield query({
text: `UPDATE users
SET defaultaddressid = $1
WHERE userid = $2`,
values: [addressId, userId]
})
}
})
As you can see the business code doesn't have to manage transactions, neither manipulate PostgreSQL clients.
It is all taken care of by the plugins!
The plugins are responsible for executing the operations yielded by the business code.
Cuillere gives them a context object, which they can use to store state or communicate for example.
I will probably write some more about Cuillere itself in the future, for now let's focus back on our JS channels...
Implementing channels
We are going to need a channel plugin to give to Cuillere, which will take care of channel operations (creation, sending and receiving):
const cllr = cuillere(
channelMiddleware()
)
cllr.call(function* () {
const ch = yield chan()
// ...
})
It is in this channelMiddleware
that we are going to implement all the mechanics to make channels work as expected.
The signature of a cuillere middleware is pretty simple (the first level of currying isn't necessary, but it is a convention to have a factory):
export const channelMiddleware = () => (next, ctx) => async operation => {
// ...
}
The middleware has a first level with the next
and ctx
arguments:
-
next
is the next middleware and should be called when the current middleware doesn't know how to handle the received operation -
ctx
is the context object
The second level of the middleware has the operation
argument, and is called each time an operation is yielded.
Let's start by implementing all the mechanics for creating a channel.
channel creation
The first thing we need is a factory for channel creation operations:
const CHAN = Symbol('CHAN')
export const chan = () => {
return {
[CHAN]: true
}
}
We use an unexported Symbol
to mark the operation and be able to recognize it in the middleware:
export const channelMiddleware = () => (next, ctx) => async operation => {
if (operation[CHAN]) {
// Create channel
}
return next(operation)
}
As I said earlier we want the channels to be just references. We could use empty plain objects, but for debugging purposes let's use something a little less opaque:
let nextChanId = 1
const chanKey = () => new String(`chan #${nextChanId++}`)
The String
constructor gives us a unique reference (new String('foo') !== new String('foo')
is always true), with a ready to use toString()
method.
Creating a new channel reference isn't enough, we also need to initiate the channel's state and store it somewhere.
Cuillere's context is precisely made for storing this kind of information, so let's use it:
if (operation[CHAN]) {
const key = chanKey()
if (!ctx[CHANS]) ctx[CHANS] = new Map() // another Symbol we created
ctx[CHANS].set(key, {
// channel's initial state
})
return key
}
We create the channel's key, store it's initial state, then return the key.
We also initialize a Map
to store the channels' states if not already done (Why a Map
? Well unlike plain objects whose keys need to be plain strings, Map
s accept any type of keys).
This is not bad but there is still a problem, the Map
will keep the channel's key and state references forever and prevent them to be garbage collected when the channel isn't used anymore...
Ideally we would like the channel's key and state to be garbage collectable as soon as no one else holds the channel's key reference anymore.
Well this is one of the use cases for WeakMaps, their keys are week references therefore their content is naturally cleaned up by the garbage collector.
Let's use a WeakMap
instead of a Map
:
if (!ctx[CHANS]) ctx[CHANS] = new WeakMap()
Now we have to decide what the channel's initial state should be. What do we need to make the receive and send operations work?
Something to store waiting senders and receivers should be enough for now, let's call that the receive queue and send queue:
ctx[CHANS].set(key, {
recvQ: [],
sendQ: [],
})
Nice! I think we are ready to move on and start implementing the receive and send operations.
One last thing that can be improved for the channel creation is the WeakMap
initialization.
Cuillere has a special start operation which happens only once just after calling cllr.call()
, let's use this instead of a lazy initialization:
if (isStart(operation)) ctx[CHANS] = new WeakMap()
Our complete code now looks like this:
import { isStart } from '@cuillere/core'
const CHANS = Symbol('CHANS')
const CHAN = Symbol('CHAN')
export const chan = () => {
return {
[CHAN]: true
}
}
let nextChanId = 1
const chanKey = () => new String(`chan #${nextChanId++}`)
export const channelMiddleware = () => (next, ctx) => async operation => {
if (isStart(operation)) ctx[CHANS] = new WeakMap()
if (operation[CHAN]) {
const key = chanKey()
ctx[CHANS].set(key, {
recvQ: [],
sendQ: [],
})
return key
}
return next(operation)
}
And now the fun begins! Let's start with the receive operation.
Receive from a channel
Just like the channel creation, the receive needs an operation factory, except this time we need to give the channel's key from which we want to receive:
const RECV = Symbol('RECV')
export const recv = (chanKey) => {
return {
[RECV]: true,
chanKey,
}
}
Then we must handle the operation in the middleware:
export const channelMiddleware = () => next => async (operation, ctx) => {
// ...
if (operation[RECV]) {
const chanState = ctx[CHANS].get(operation.chanKey)
// Receive from channel...
}
return next(operation)
}
We fetch the state of the channel from the context using the channel's key of the operation.
Now there are two possibilities when receiving from a channel:
- the send queue has senders: we receive from the first sender
- the send queue is empty: we add a receiver in the receive queue
Let's handle the case of a non empty send queue first:
if (operation[RECV]) {
// ...
const sender = chanState.sendQ.shift()
if (sender) return sender()
}
This is pretty straightforward, we just shift the first sender from the send queue (shift()
returns undefined
if the send queue is empty).
If the sender is defined we call it to get the value and return it right away.
Now let's handle the case of the empty send queue:
if (operation[RECV]) {
// ...
return new Promise(resolve => {
chanState.recvQ.push(resolve)
})
}
This is pretty simple too, we create and return a new Promise
and push its resolve function at the end of the receive queue.
As soon as a sender will call the receiver with a value, the Promise
will be resolved.
As you can see, by using push()
and shift()
on both the receive and send queues, we can make them act as FIFO queues.
And we are done with the receive operation! Let's move on to the send operation.
Send to channel
The send operation's factory looks like the receive operation's one with one more argument:
const SEND = Symbol('SEND')
export const send = (chanKey, value) => {
return {
[SEND]: true,
chanKey,
value,
}
}
value
is the value to be sent to the channel.
Now let's handle the operation in the middleware:
export const channelMiddleware = () => (next, ctx) => async operation => {
// ...
if (operation[SEND]) {
const chanState = ctx[CHANS].get(operation.chanKey)
// Send to channel...
}
return next(operation)
}
Just like when receiving, there are two possibilities when sending to a channel:
- the receive queue has receiver: we send to the first receiver
- the receive queue is empty: we add a sender in the send queue
The case of a non empty receive queue should be pretty straightforward:
if (operation[SEND]) {
// ...
const recver = chanState.recvQ.shift()
if (recver) {
recver(operation.value)
return
}
}
We shift the first receiver from the receive queue and, if it is defined, we call it with the operation's value, then we return right away.
The empty receive queue case is a little more complex:
if (operation[SEND]) {
// ...
return new Promise(resolve => {
chanState.sendQ.push(() => {
resolve()
return operation.value
})
})
}
We create and return a new Promise
, but this time we cannot push the raw resolve
function in the senders queue.
We have to create a sender arrow function which resolves the Promise
and returns the operation's value.
🎉 And this is it! We have all we need to make our send123()
example work (we don't need to implement the fork operation which is already built into cuillere).
The full example is available on repl.it (it uses esm in order to benefit of modules):
What next?
As you already know or as you may have guessed, Go channels offer a larger feature set than just sending and receiving values:
- channels may have a buffer
- channels may be closed
-
range
lets you iterate over a channel -
select
lets you wait on multiple channel operations
So I have four more posts coming in which I will implement the full feature set of channels:
- Go channels in JS (2/5): Buffering
- Go channels in JS (3/5): Closing
- Go channels in JS (4/5): Ranging
- Go channels in JS (5/5): Selecting
I hope you enjoyed this first post, give a ❤️, 💬 leave a comment, or share it with others, and follow me to get notified of my next posts.