Go channels in JS (3/5): Closing

Nicolas Lepage - Dec 17 '19 - - Dev Community

This post is the third of a series about how I wrote in JavaScript the equivalent of Go(lang) channels.

If you haven't already, I highly recommend reading at least the first post before reading this one:

In the previous posts we built an equivalent of Go channels in JS.
We were able to create channels, buffered or unbuffered, send values to these, and receive values from these.

This time we will add a new feature to our JS channels: Closing.

But why the heck would we want to close a channel? Let's answer this for a start.

Closing channels

Closing a channel in Go is pretty easy, you just have to call the close() built-in function (built-in functions like close() and make() are available without importing any package):

func example() {
  ch := make(chan int) // Create a channel

  // Use the channel...

  close(ch) // Close the channel
}

Closing the channel has several effects.

First, it is not possible to send values to a closed channel, it will panic (throw an error) if you do.
Therefore it is always the responsibility of the sender to close a channel.

Second, once the channel's buffer is empty, the information that it is closed will be broadcasted to any further receivers.

But what does "broadcasting the information that it is closed" mean? How is this done in concrete terms?
Well a receive operation on a closed channel (with no values left in its buffer) will return instantly with a zero value.

Let's take back our send123() example from the previous posts:

func main() {
  ch := make(chan int) // Create an integer channel

  go send123(ch) // Start send123() in a new goroutine

  // Receive an integer from ch and print it to stdout 6 times
  fmt.Println(<-ch) // Prints 1
  fmt.Println(<-ch) // Prints 2
  fmt.Println(<-ch) // Prints 3
  fmt.Println(<-ch) // Prints 0
  fmt.Println(<-ch) // Prints 0
  fmt.Println(<-ch) // Prints 0
}

func send123(ch chan int) {
  // Send 3 integers to ch
  ch <- 1
  ch <- 2
  ch <- 3

  close(ch) // Close 
}

As you can see, once ch is closed, any further receive operation returns the zero value, which is 0 for integers.

We could make a loop to receive integers from ch and assert that it is closed as soon as we receive 0.
But what if we want to send 0 over ch?

We need a better way of knowing if the channel is closed.
And Go provides that by allowing to receive two values from a channel:

func example(ch chan int) {
  i, ok := <-ch

  if ok {
    fmt.Println("Received:", i)
  } else {
    fmt.Println("Channel closed")
  }
}

As you can see when receiving two values from a channel, the second value is a boolean which tells us whether we actually received something or not, in which case the channel is closed.

So let's use this second value to build a loop, and iterate over the integers received from send123():

func main() {
  ch := make(chan int) // Create an integer channel

  go send123(ch) // Start send123() in a new goroutine

  // Receive an integer from ch and print it until ch is closed
  for { // This is like while (true)
    i, ok := <-ch
    if !ok {
      break
    }
    fmt.Println(i)
  }
}

func send123(ch chan int) {
  // Send 3 integers to ch
  ch <- 1
  ch <- 2
  ch <- 3

  close(ch) // Close ch
}

Now how could we do the same thing in JS?

In JS it is not possible to return two values, but we can use a scalar.
However the receive operation won't be able know if we want only the value, or a scalar with the value and a boolean...

We are forced to add an optional argument to the receive operation, which we will use when we want to know if the channel is closed.
Now let's transpose our example to JS:

function* main() {
  const ch = yield chan() // Create a channel

  yield fork(send123, ch) // Start send123()

  // Receive a value from ch and log it to console until ch is closed
  while (true) {
    const [i, ok] = yield recv(ch, true)
    if (!ok) break
    console.log(i)
  }
}

function* send123(ch) {
  // Send 3 integers to ch
  yield send(ch, 1)
  yield send(ch, 2)
  yield send(ch, 3)

  yield close(ch) // Close ch
}

The second argument of our receive operation allows us to ask for a detailed receive by setting it to true.

And of course, there is the new close operation that allows us to close the channel.

Now let's add this closing feature to our JS channels!

Implementing channel closing

Let's start with the close operation.

Close operation

As usual we need an operation factory:

const CLOSE = Symbol('CLOSE')
export const close = chanKey => {
  return {
    [CLOSE]: true,
    chanKey,
  }
}

This operation needs only chanKey which is the key of the channel we want to close.

Then we also need a boolean in the channel's state in order to know if the channel is closed.
Let's initialize this boolean when we create a channel:

export const channelMiddleware = () => (next, ctx) => async operation => {
  if (operation[CHAN]) {
    const key = chanKey(operation.bufferSize)

    ctx[CHANS].set(key, {
      sendQ: [],
      recvQ: [],
      buffer: Array(operation.bufferSize),
      bufferLength: 0,
      closed: false, // New closed flag
    })

    return key
  }

  // ...
}

Finally we need to handle close operations in the channel middleware:

export const channelMiddleware = () => (next, ctx) => async operation => {
  // ...

  if (operation[CLOSE]) {
    const chanState = ctx[CHANS].get(operation.chanKey)

    if (chanState.closed) throw TypeError(`close on closed ${operation.chanKey}`)

    chanState.closed = true

    return
  }

  // ...
}

First we check if the channel was already closed, in which case we throw an error.
If not, we flag the channel as closed.

However this isn't enough, we also need to drain the waiting receivers out of the receive queue:

if (operation[CLOSE]) {
  // ...

  let recver
  while (recver = chanState.recvQ.shift()) recver([undefined, false])

  return
}

We are giving a scalar with an undefined value and a false boolean to the receiver, meaning that nothing was received and the channel is closed.
It will be the responsibility of the receiver to decide whether to return the scalar or only the value, but we'll talk about that later...

At this point you may wonder why we aren't checking the channel's buffer or even send queue.
But you have to remember that every operation leaves the channel in a stable state, so if we have receivers in the receive queue we can safely assert that there is no senders in the send queue and no values in the buffer.

And this is it for the close operation! Let's move on to the receive operation...

Receive operation

The receive operation must now be able to return either only the value, or a scalar with the value and a boolean, what we will call a "detailed receive".

So let's start by adding an optional detail argument to the operation factory:

const RECV = Symbol('RECV')
export const recv = (chanKey, detail = false) => {
  return {
    [RECV]: true,
    chanKey,
    detail,
  }
}

detail defaults to false, so by default a receive operation will return only the value.

Now let's use this new detail flag in the channel middleware.

We could use it in every case where we are returning from a receive operation... But we would duplicate the same code.
Instead, let's extract a new doRecv() function which will always return a detailed receive:

const doRecv = async (ctx, chanKey) => {
  const chanState = ctx[CHANS].get(chanKey)

  if (chanState.bufferLength !== 0) {
    const value = chanState.buffer[0]
    chanState.buffer.copyWithin(0, 1)

    // ...

    return [value, true]
  }

  const sender = chanState.sendQ.shift()
  if (sender) return [sender(), true]

  return new Promise(resolve => {
    chanState.recvQ.push(resolve)
  })
}

We return a scalar when receiving from the buffer or from the receive queue.

There is still the third case when we are pushing a receiver in the receive queue, this receiver must be called with a detailed receive, so let's keep this in mind for the send operation.

We also have to check if the channel is closed.
Receiving values from a closed channel is possible, hence we must do it only if its buffer and send queue are empty, just before pushing to the receive queue:

const doRecv = async (ctx, chanKey) => {
  // ...

  if (chanState.closed) return [undefined, false]

  return new Promise(resolve => {
    chanState.recvQ.push(resolve)
  })
}

If the channel is closed, we return a scalar with an undefined value and a false boolean, meaning that nothing was received.

Finally let's use our new doRecv() function in the channel middleware:

export const channelMiddleware = () => (next, ctx) => async operation => {
  // ...

  if (operation[RECV]) {
    const res = await doRecv(ctx, operation.chanKey)
    return operation.detail ? res : res[0]
  }

  // ...
}

Pretty simple, we are using the detail flag only once when doRecv() returns, to decide whether to return the scalar or only the value.

And this is it for the receive operation! Let's end with the send operation...

Send operation

This one is going to be much simpler.

The first thing we need to do is check that we are not sending on a closed channel:

if (operation[SEND]) {
  const chanState = ctx[CHANS].get(operation.chanKey)

  if (chanState.closed) throw TypeError(`send on closed ${operation.chanKey}`)

  // ...
}

Then all we have to do is make sure that we are giving a detailed receive when calling a receiver from the receive queue:

if (operation[SEND]) {
  // ...

  const recver = chanState.recvQ.shift()
  if (recver) {
    recver([operation.value, true])
    return
  }
}

We give to the receiver a scalar with the value and a true boolean, meaning that something has actually been received.

And we have made it! We are now able to close our JS channels.
Let's try this out on repl.it with our send123() example (it uses esm to benefit from modules):

What next

Next time, we are going to take full advantage of what we have built so far, and add a cool feature to our JS channels: Ranging!

So I have two more posts coming:

  • Go channels in JS (4/5): Ranging
  • Go channels in JS (5/5): Selecting

And maybe some bonus posts.

I hope you enjoyed this third one, give a ❤️, 💬 leave a comment, or share it with others, and follow me to get notified of my next posts.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .