Go channels in JS (1/5): Sending and Receiving

Nicolas Lepage - Dec 2 '19 - - Dev Community

This post is the first of a series about how I wrote in JavaScript the equivalent of Go(lang) channels.

I did this mainly for fun, I really like Go's concurrency model, and as a daily JS developer I thought it was a good challenge to recreate it in JS.

Whether you already know Go's concurrency model or not isn't important, I'll show very simple examples along the way, and it's a good opportunity to learn a different paradigm from Promise and async/await.

Furthermore, I'm making an extensive use of JS's generator functions, and some other funny things such as WeakMap or async iterables (with for await ... of), so you might also learn a few things about JS!

Now let's start with a short presentation of Go channels and their basic usage.

Go channels

Go has a really simple way of starting new threads (these are very lightweight threads called goroutine) with the go keyword:

func main() {
    go func1()
    ...
}

func func1() { ... }
Enter fullscreen mode Exit fullscreen mode

In the above example, func1() will start in a new goroutine and execute concurrently with main().
The equivalent in JS would be calling an async function without using await:

async function main() {
    func1()
    ...
}

async function func1() { ... }

Enter fullscreen mode Exit fullscreen mode

When several functions execute concurrently in JS, it is safe to share some memory between these.

This is one of the nice properties of JS's event loop, as a developer you don't have to ensure that you're accessing memory in a safe way, it is always the case!

This differs from Go where, if using shared memory, you must protect it using mutex to ensure that only one goroutine at a time is accessing some variable(s).

However in Go you also have the possibility to avoid the use of shared memory, and that's where channels come in handy:

func main() {
  ch := make(chan int) // Create an integer channel

  go send123(ch) // Start send123() in a new goroutine

  // Receive an integer from ch and print it to stdout 3 times
  fmt.Println(<-ch)
  fmt.Println(<-ch)
  fmt.Println(<-ch)
}

func send123(ch chan int) {
  // Send 3 integers to ch
  ch <- 1
  ch <- 2
  ch <- 3
}

Enter fullscreen mode Exit fullscreen mode

In the above example, main() creates an integer channel, starts send123() in a new goroutine, and then receives an integer from the channel and prints it to standard output 3 times.
send123() sends 3 integers to the channel. On each side, the channel operations are blocking (main() will block until send123() sends an integer, and vice versa).

As you can see channels are pretty simple to use, and allow to avoid sharing memory between main() and send123() except for the channel reference.

So basically I've just said earlier that channels are useless in JS 🤪! They resolve the problem of shared memory which doesn't exist in JS...

But! There's more to channels. They offer a different way of programming concurrent applications, which might feel more natural to a lot of people.

They also come with some interesting concurrency patterns such as:

  • Pipeline
  • Fan in
  • Fan out
  • Timeout

Finally, I already said this but I'm going to repeat myself, I did this mainly for fun!
Now let's start exploring how we might implement channels in JS...

Implementing channels in JS

We now have a rough idea of what Go channels are and how they are used, but how could we implement the same thing in JS?

We could use async/await in some different ways:

  • Use a plain object to hold the state of a channel, and define send() and recv() async functions which would take a channel's state as first argument
  • Define a Channel prototype or ES6 class which would hold the state of a channel and have send() and recv() async methods

Or we could use generator functions and the yield keyword to hide the magic away: channels would be just references, and we would yield receive and send operations on these references.

I won't really discuss the pros and cons of using one solution or another here, I chose to use generator functions for the following reasons:

  • Generator functions (and the yield keyword) will allow us to implement channels with a way of working closer to that of Go channels
  • A friend and I have been developing a generator functions runner framework called Cuillere (🥄 spoon in french) and I love using it 😁

So now that we I have decided to use generator functions, we can imagine what the final result would look like.
Let's transpose our Go example with send123():

function* main() {
  const ch = yield chan() // Create a channel

  yield fork(send123, ch) // Start send123()

  // Receive a value from ch and log it to console 3 times
  console.log(yield recv(ch))
  console.log(yield recv(ch))
  console.log(yield recv(ch))
}

function* send123(ch) {
  // Send 3 integers to ch
  yield send(ch, 1)
  yield send(ch, 2)
  yield send(ch, 3)
}
Enter fullscreen mode Exit fullscreen mode

Pretty cool! And using the yield keyword we are going to make sure that recv() and send() operations are blocking, just like in Go! (main() blocks until send123() sends an integer, and vice versa).

There are two additional operations:

  • chan() creates a new channel
  • fork() starts the execution of a generator function without waiting for it to complete (this one might look familiar if you are a redux-saga user)

Great! We have defined what we want, our goal now is to implement all the hidden mechanics which will allow this code to work.

But before diving into this, let me give you a short presentation of what cuillere is and how it is going to help us.

🥄 Cuillere! (queeyeah!)

CuillereJS is an extensible asynchronous execution framework based on generator functions.

So you are probably wondering why we created this Cuillere thing in the first place...
Well the goal of Cuillere is to abstract some inevitable technical complexity in plugins, and keep business code as simple and focused as possible.

At the time, we were working on a NodeJS/GraphQL backend API with a PostgreSQL database, and in order to ensure the integrity of our data we had to manage transactions properly.

I won't get into the details here, but managing transactions soon became a real pain for two reasons:

  • using a callback function every time we had to ensure a transaction was opened
  • passing the "transaction aware" PostgreSQL client everywhere we had to perform a database operation

Our code had become overly complex just because of transactions management... It was hard to read and often buggy!
That is when we created Cuillere, which allowed us to strip our business code from all the complexity of transactions management.

Here is a simple example of how cuillere is used with PostgreSQL:

const cuillere = require('@cuillere/core')
const {
  poolMiddleware, transactionMiddleware, queryMiddleware
} = require('@cuillere/postgres')

const cllr = cuillere(
  poolMiddleware({ /* postgres config */ }), // Manages connection pool
  transactionMiddleware(), // Manages transactions
  queryMiddleware() // Executes queries
)

const addUserAddress = (userId, address, setDefault) => cllr.call(function*() {
  const res = yield query({
    text: `INSERT INTO addresses (userId, street, postalcode, city)
           VALUES ($1, $2, $3, $4)
           RETURNING *`,
    values: [userId, address.street, address.postalCode, address.city]
  })
  if (setDefault) {
    const addressId = res.rows[0].id
    yield query({
      text: `UPDATE users
             SET defaultaddressid = $1
             WHERE userid = $2`,
      values: [addressId, userId]
    })
  }
})
Enter fullscreen mode Exit fullscreen mode

As you can see the business code doesn't have to manage transactions, neither manipulate PostgreSQL clients.
It is all taken care of by the plugins!

The plugins are responsible for executing the operations yielded by the business code.
Cuillere gives them a context object, which they can use to store state or communicate for example.

I will probably write some more about Cuillere itself in the future, for now let's focus back on our JS channels...

Implementing channels

We are going to need a channel plugin to give to Cuillere, which will take care of channel operations (creation, sending and receiving):

const cllr = cuillere(
  channelMiddleware()
)

cllr.call(function* () {
  const ch = yield chan()
  // ...
})
Enter fullscreen mode Exit fullscreen mode

It is in this channelMiddleware that we are going to implement all the mechanics to make channels work as expected.

The signature of a cuillere middleware is pretty simple (the first level of currying isn't necessary, but it is a convention to have a factory):

export const channelMiddleware = () => (next, ctx) => async operation => {
  // ...
}
Enter fullscreen mode Exit fullscreen mode

The middleware has a first level with the next and ctx arguments:

  • next is the next middleware and should be called when the current middleware doesn't know how to handle the received operation
  • ctx is the context object

The second level of the middleware has the operation argument, and is called each time an operation is yielded.

Let's start by implementing all the mechanics for creating a channel.

channel creation

The first thing we need is a factory for channel creation operations:

const CHAN = Symbol('CHAN')
export const chan = () => {
  return {
    [CHAN]: true
  }
}
Enter fullscreen mode Exit fullscreen mode

We use an unexported Symbol to mark the operation and be able to recognize it in the middleware:

export const channelMiddleware = () => (next, ctx) => async operation => {
  if (operation[CHAN]) {
    // Create channel
  }

  return next(operation)
}
Enter fullscreen mode Exit fullscreen mode

As I said earlier we want the channels to be just references. We could use empty plain objects, but for debugging purposes let's use something a little less opaque:

let nextChanId = 1
const chanKey = () => new String(`chan #${nextChanId++}`)
Enter fullscreen mode Exit fullscreen mode

The String constructor gives us a unique reference (new String('foo') !== new String('foo') is always true), with a ready to use toString() method.

Creating a new channel reference isn't enough, we also need to initiate the channel's state and store it somewhere.
Cuillere's context is precisely made for storing this kind of information, so let's use it:

if (operation[CHAN]) {
  const key = chanKey()
  if (!ctx[CHANS]) ctx[CHANS] = new Map() // another Symbol we created
  ctx[CHANS].set(key, {
    // channel's initial state
  })
  return key
}
Enter fullscreen mode Exit fullscreen mode

We create the channel's key, store it's initial state, then return the key.
We also initialize a Map to store the channels' states if not already done (Why a Map? Well unlike plain objects whose keys need to be plain strings, Maps accept any type of keys).

This is not bad but there is still a problem, the Map will keep the channel's key and state references forever and prevent them to be garbage collected when the channel isn't used anymore...

Ideally we would like the channel's key and state to be garbage collectable as soon as no one else holds the channel's key reference anymore.
Well this is one of the use cases for WeakMaps, their keys are week references therefore their content is naturally cleaned up by the garbage collector.

Let's use a WeakMap instead of a Map:

if (!ctx[CHANS]) ctx[CHANS] = new WeakMap()
Enter fullscreen mode Exit fullscreen mode

Now we have to decide what the channel's initial state should be. What do we need to make the receive and send operations work?
Something to store waiting senders and receivers should be enough for now, let's call that the receive queue and send queue:

ctx[CHANS].set(key, {
  recvQ: [],
  sendQ: [],
})
Enter fullscreen mode Exit fullscreen mode

Nice! I think we are ready to move on and start implementing the receive and send operations.

One last thing that can be improved for the channel creation is the WeakMap initialization.
Cuillere has a special start operation which happens only once just after calling cllr.call(), let's use this instead of a lazy initialization:

if (isStart(operation)) ctx[CHANS] = new WeakMap()
Enter fullscreen mode Exit fullscreen mode

Our complete code now looks like this:

import { isStart } from '@cuillere/core'

const CHANS = Symbol('CHANS')

const CHAN = Symbol('CHAN')
export const chan = () => {
  return {
    [CHAN]: true
  }
}

let nextChanId = 1
const chanKey = () => new String(`chan #${nextChanId++}`)

export const channelMiddleware = () => (next, ctx) => async operation => {
  if (isStart(operation)) ctx[CHANS] = new WeakMap()

  if (operation[CHAN]) {
    const key = chanKey()
    ctx[CHANS].set(key, {
      recvQ: [],
      sendQ: [],
    })
    return key
  }

  return next(operation)
}
Enter fullscreen mode Exit fullscreen mode

And now the fun begins! Let's start with the receive operation.

Receive from a channel

Just like the channel creation, the receive needs an operation factory, except this time we need to give the channel's key from which we want to receive:

const RECV = Symbol('RECV')
export const recv = (chanKey) => {
  return {
    [RECV]: true,
    chanKey,
  }
}
Enter fullscreen mode Exit fullscreen mode

Then we must handle the operation in the middleware:

export const channelMiddleware = () => next => async (operation, ctx) => {
  // ...

  if (operation[RECV]) {
    const chanState = ctx[CHANS].get(operation.chanKey)

    // Receive from channel...
  }

  return next(operation)
}
Enter fullscreen mode Exit fullscreen mode

We fetch the state of the channel from the context using the channel's key of the operation.

Now there are two possibilities when receiving from a channel:

  • the send queue has senders: we receive from the first sender
  • the send queue is empty: we add a receiver in the receive queue

Let's handle the case of a non empty send queue first:

if (operation[RECV]) {
  // ...

  const sender = chanState.sendQ.shift()
  if (sender) return sender()
}
Enter fullscreen mode Exit fullscreen mode

This is pretty straightforward, we just shift the first sender from the send queue (shift() returns undefined if the send queue is empty).
If the sender is defined we call it to get the value and return it right away.

Now let's handle the case of the empty send queue:

if (operation[RECV]) {
  // ...

  return new Promise(resolve => {
    chanState.recvQ.push(resolve)
  })
}
Enter fullscreen mode Exit fullscreen mode

This is pretty simple too, we create and return a new Promise and push its resolve function at the end of the receive queue.
As soon as a sender will call the receiver with a value, the Promise will be resolved.

As you can see, by using push() and shift() on both the receive and send queues, we can make them act as FIFO queues.

And we are done with the receive operation! Let's move on to the send operation.

Send to channel

The send operation's factory looks like the receive operation's one with one more argument:

const SEND = Symbol('SEND')
export const send = (chanKey, value) => {
  return {
    [SEND]: true,
    chanKey,
    value,
  }
}
Enter fullscreen mode Exit fullscreen mode

value is the value to be sent to the channel.

Now let's handle the operation in the middleware:

export const channelMiddleware = () => (next, ctx) => async operation => {
  // ...

  if (operation[SEND]) {
    const chanState = ctx[CHANS].get(operation.chanKey)

    // Send to channel...
  }

  return next(operation)
}
Enter fullscreen mode Exit fullscreen mode

Just like when receiving, there are two possibilities when sending to a channel:

  • the receive queue has receiver: we send to the first receiver
  • the receive queue is empty: we add a sender in the send queue

The case of a non empty receive queue should be pretty straightforward:

if (operation[SEND]) {
  // ...

  const recver = chanState.recvQ.shift()
  if (recver) {
    recver(operation.value)
    return
  }
}
Enter fullscreen mode Exit fullscreen mode

We shift the first receiver from the receive queue and, if it is defined, we call it with the operation's value, then we return right away.

The empty receive queue case is a little more complex:

if (operation[SEND]) {
  // ...

  return new Promise(resolve => {
    chanState.sendQ.push(() => {
      resolve()
      return operation.value
    })
  })
}
Enter fullscreen mode Exit fullscreen mode

We create and return a new Promise, but this time we cannot push the raw resolve function in the senders queue.
We have to create a sender arrow function which resolves the Promise and returns the operation's value.

🎉 And this is it! We have all we need to make our send123() example work (we don't need to implement the fork operation which is already built into cuillere).

The full example is available on repl.it (it uses esm in order to benefit of modules):

What next?

As you already know or as you may have guessed, Go channels offer a larger feature set than just sending and receiving values:

  • channels may have a buffer
  • channels may be closed
  • range lets you iterate over a channel
  • select lets you wait on multiple channel operations

So I have four more posts coming in which I will implement the full feature set of channels:

I hope you enjoyed this first post, give a ❤️, 💬 leave a comment, or share it with others, and follow me to get notified of my next posts.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .