Sunday 25 March 2018

Understanding the node stream API, through examples and practice

We utilise node streams extensively in my day job. Making extensive use of the competing consumers pattern to process messages from AWS queues. We pull messages off SQS, then using the node stream API we process the messages through various stages, until, we write to a mongo DB and/or issue events that land on other SQS queues usually via SNS.

I've been looking into how we gain more performance out of our existing codebase, looking into what can be tweaked, and what needs rewriting, its forced me to become quite intimate with nodes stream API, how buffers really work and what effect the highWaterMark has on them. I've also looked at how we could use event emitters rather than streams to make better use of nodes asynchronous nature.

In this post I aim to impart some of my recent knowledge, for my own future reference but also because, having polled many devs on the subject, there are a number of misconceptions about how streams in node actually work.

Analogies

Analogies are useful. I have always thought of streams as a pipe that data flows through, like a hosepipe with water. As with most analogies it is useful but will only take you so far.

__-0_
_____==========|====================|==========
Tap   Reader       Transformer        Writer


A hose has a tap connected at one end (a reader), some length of pipe in the middle (transformers) and an opening at the other to let the water out of the pipe (a writer). A hosepipe will fill with water when you turn on the tap until it is full with water and then will start to issue water from the open end. If a real tap is switched off the water stops flowing at the open end with water still in the tap. So imagine a hose that is dangling vertically, still attached to the tap but if the water is switched off the water in the pipe will all drain out due to gravity, this is more like a stream in node.

Buffers

__-0_                      _
_____==========|==========|x|==========|==========
Tap   Reader         Transformer        Writer


But the water in the hose is not all being processed at the same time. Imagine 3 parts to the hose all connected together, each one capable of holding a certain amount of water, this is analogous to buffers. In the centre of the middle pipe (the transformer) you have a box, a unit of water flows into the box, then out, but only one unit of water can pass through at once. This is in fact how a transform stream works. Take a compression stream for example, one piece of data goes in, its compressed and the compressed data comes out ready to be written by the writer.

So you now have 4 sections of pipe, each capable of containing some water, these are the buffers. The reader has a buffer on its outgoing side, the transformer has 2 buffers, one read buffer and one write buffer, and the writer has one incoming buffer. If the highWatermark is set to 5 for all streams you have room for 20 units of water (this is not the whole story as I will explain later but like I said it’s a useful analogy).

The streams highWaterMark setting governs the desired amount of data to be in the buffer at any one time. It is not a limit as we will show later but more a guide to tell the stream when to pause the streaming of data into the buffer.

Classes and functions of the stream API


 _                       _                       _
|R|==========|==========|T|==========|==========|W|
    Reader         Transformer         Writer


When you implement streams you need to implement a function in each of the Reader, Transformer and the Writer. The function names are quite self-explanatory _read, _transform and _write respectively. These functions are called when data flow through the pipe, it all happens once the streams are piped together thus:

readStream.pipe(transformStream).pipe(writeStream);

Once the streams are pipped together any data flowing into the read stream will be pulled down the pipe by the stream API which will handle any buffering needed but what is key right now is that only one piece of information can flow through any given function at once, they are 'NOT' asynchronous out of the box. This makes sense as you would not want a transform stream that is compressing or encrypting data to process multiple chunks simultaneously and potentially get the zipped chunks mixed up in the write buffer.

Now because there can be only one piece of data flowing through the functions at once the buffers will fill up if the rate of data coming into the pipe is greater than the rate of data flowing out of the pipe. Usually this is true, for example reading from a file is generally quicker than writing to another.

Data flow


If we look at what happens when a single message is inserted into the pipe you will see it do this
 _                     _                    _
|1|=========|=========| |=========|========| |
 _                     _                    _
| |=========|=========|1|=========|========| |
 _                     _                    _
| |=========|=========| |=========|========|1|


If the write is slow and 1 more message was to be read this is what would happen (for the sake of this example the buffer (highWaterMark) is set to 1 for all streams).

 _                     _                   _
| |====-====|====-====| |====2====|===x===|1|


Notice how both the writers buffer and the transformers write buffer are both full (the writer as a whole can only have one message within it, the buffer was full but it's now processing that message). If a 3rd message was to go down the pipe it would get stuck in the transformers read buffer, it has nowhere to go so will not go into the _transform function until there is room in the next buffer.

 _                     _                   _
| |====-====|====3====| |====2====|===x===|1|


Finally if the reader pumped a 4th message into the pipe it would go into the readers buffer.

 _                     _                   _
| |====4====|====3====| |====2====|===x===|1|


In this state nothing can flow down the pipe until the writer is finished processing message number one. This is called backpressure, messages back up behind the slowest stream (in this case the writer but it could equally be a transformer).

Backpressure, drain and resume


That was a bit of a simplistic example, let's say the buffers are set to a high watermark of 2 rather than one. A slow write would result in the following situation (again note that the write stream can only have 2 messages in it as a whole, with only one in its buffer).

 _                       _                      _
| |===8==7===|===6==5===| |===4==3===|====2====|1|


At this point technically speaking the transform stream and the read stream are paused and will not resume until their buffers are emptied when a drain event occurs. Let's see what happens when the writer finishes writing [1]

 _                       _                      _
| |===8==7===|===6==5===| |===4==3===|====-====|2|


Notice that more data is not pulled from the transform write buffer yet.

Only when [2] finishes processing will a drain event on the write stream occur causing it fill again. There is enough room for 2 pieces of data in the writer, 1 in the buffer and one in the slow write operation. So as soon as [2] finishes being written 3 will be pulled into the write stream

 _                       _                      _
| |===8==7===|===6==5===| |===-==4===|====3====| |


There is now room in the transforms write buffer so 5 will get pulled through the transform

 _                       _                      _
| |===8==7===|===-==6===|5|===-==4===|====3====| |


3 will then go into the writer and 4 will go into the writers buffer
 _                       _                      _
| |===8==7===|===-==6===|5|===-==-===|====4====|3|


When 5 finishes its transform 6 will also go through the transform causing the transforms read buffer to drain
 _                       _                      _
| |===8==7===|===-==-===| |===6==5===|====4====|3|

7 and 8 will then flow into the transforms read buffer
 _                       _                      _
| |===-==-===|===8==7===| |===6==5===|====4====|3|

This causes the readers buffer to drain which will then pull 2 more messages in through the reader
 _                       _                      _
| |==10==9===|===8==7===| |===6==5===|====4====|3|

And we are back to the state we had originally, we are now waiting for the slow write operation to occur on message #3

Buffering

So what you can see from the above example is a pulsing action when messages sort of get pulled through in pairs (the current high watermark) once 2 slow writes have occurred. This is due to the way that the backpressure gets released and the buffers fully empty before fully filling up again. Obviously this gets more complex if you have transformers that are also slow but you will always see messages flowing through the buffers in pairs (when all the streams high watermarks are set to 2).


Some demos

Firstly I'd like to introduce you to the repo that contains the examples. You can see there are a number or index files each one is designed to be run with node indexFileName.js and will give a coloured output to the terminal.

https://github.com/DamianStanger/streams101

I aim to produce and demo some simple code that you can then play with, change the high watermark settings and the numbers of messages with a view to allowing you to gain a better understanding of how streams work yourselves.

Object mode

First a quick note that all the examples given below use streams in object mode. This is because in my case I'm investigating how I can get more performance out of my streams when pulling messages off an SQS queue. So to use object mode makes sense but all the concepts will be similar if you're dealing with chunked streams. The exception here is writing in an asynchronous manner, for example you would not want to write multiple chunks to a file stream asynchronously as your chunks would get all mixed up in the resultant file.

Demo - Fast reading, fast writing

Execute: node 01_indexSync.js
Example results: ./results/01_indexSync.log

The firstly simple demo that shows what happens when the reader and the writer are not IO bound, they are totally synchronous and there are no callbacks or promises involved. The results can be seen in indexSync.log.

Even though the high watermark is set to 2 in each stream there is only ever one message in any of the streams at once. This is because the processing of the message within each stream is CPU bound and so gives the event loop no time to do some other work, including pulling the next message into a buffer.

Notice that it’s the reader that ends the process by sending a null message through the system when it is finished the set number of reads. If you were to comment the push(null) statement out, then the transform and write streams will not know when to finish and so wont execute the _final method.

Demo - Back pressure

Execute: node 02_indexBackPressure.js
Example results: ./results/02_indexBackPressure.log

Here the writer has not been wired into the pipe and so this gives the opportunity to see what occurs when the reader reads until all the buffers are full. The high watermark is set to 5 for each stream so you can see that 15 messages are read, 5 of which are transformed to fill the write buffer of the transform stream. 5 will be stuck in the read buffer of the transform stream and 5 will be in the outgoing buffer of the read stream. At this point all the streams are paused and since there is no writer to trigger a drain then there are no more callbacks to process and the process ends.

A selection of the log is shown below with a little removed to make for a clearer example, see the full log in the repo. Notice how on lines 23 and 33 a zero is returned indicating that the stream is going to pause.

++ 1 Read        <----+
++ 2 Read             |
++++ 1 Transform      |
++ 3 Read             | These 5 messages went straight
++++ 2 Transform      | through the _transform method
++ 4 Read             | and into the write buffer of
++++ 3 Transform      | the transform stream
++ 5 Read             |
++++ 4 Transform      |
++ 6 Read          <----+
++++ 5 Transform <----+ |
++ 7 Read               |
++ 8 Read               | These 5 messages are in the
++ 9 Read               | Transform streams read buffer
++ 10 Read       <------+
++ 11 Read       <----+
++ 12 Read            | These messages are in the
++ 13 Read            | read streams outgoing buffer
++ 14 Read            |
++ 15 Read       <----+


Demo - Slow writing

Execute: node 03_indexSyncSlow.js
Example results: ./results/03_indexSyncSlow.log

Here I have added a slow write operation to the writer, this now operates in the same manner as the walk through I did above titled 'Backpressure drain and resume'. The highwatermark is set to 2 for all streams, and we are processing 10 messages. The results of running this can be seen here (again I've removed some log lines for brevity).

++++++++++ READ Start - Tue Mar 06 2018 18:00:46 GMT+0000 (STD)
++ 1 Read
++ 2 Read
++++ 1 Transform
++++++ 1 Write   <-- 1st message goes into the _write method and starts slow processing
++ 3 Read
++++ 2 Transform <-- 2nd message goes into the write streams buffer
++ 4 Read
++++ 3 Transform <-- 3rd message goes into the transforms stream write buffer
++ 5 Read        <-- 5th message goes into the transform stream read buffer
++++ 4 Transform <-- 4th message goes into the transforms stream write buffer
++ 6 Read        <-- 6th message goes into the transform stream read buffer
++ 7 Read               <-- Stuck in outgoing the read stream buffer
++ 8 Read               <-- Stuck in outgoing the read stream buffer

 At this point all the buffers are full until the writer finishes

------ 1 Write finished
++++++ 2 Write
------ 2 Write finished <-- Write stream is now empty, there is room for 2
                            messages in its buffer. Calling drain will result
                            in 2 messages pushed in from the transform buffer.
++++ 5 Transform        <-- The transform streams write buffer is now empty, so
                            a drain event is called causing 2 messages to be pulled
                            through the _transform method
++++++ 3 Write
++++ 6 Transform <-- Both the messages that were in the transform stream read buffer have
                     been processed, drain causes the 2 messages in the read stream
                     buffer to be pulled into the transform read buffer.
++ 9 Read
++ 10 Read
------ 3 Write finished <-- From here the whole process will start again
++++++ 4 Write
------ 4 Write finished
...
...


Demo - Batch reading

Execute: node 04_indexBatchRead.js
Example results: ./results/04_indexBatchRead.log

I've been implying that the buffers in the streams are finite in nature, that they can only hold a given number of messages, this is not correct and is where the hosepipe analogy starts to breakdown. In the example 03_ a high watermark of 2 was used, when the buffer becomes full any message pushed into the stream will result in a result of false being issued back to the origin that is pushing data into the stream, this back pressure resulted in exactly the 'correct' number of messages in each of the buffers.

When you run this example you will see a long output that I have include below with many parts cut out. The interesting parts are denoted with a number and referenced below:

++ 1 Read                        <---- #001
++ 1.01 Read Batch push:1
++ 1.02 Read Batch push:1
++ 1.03 Read Batch push:1
++ 1.04 Read Batch push:0
++ 1.05 Read Batch push:0
++ 2 Read                        <---- #002
++ 2.01 Read Batch push:0
...
++ 2.05 Read Batch push:0
++++ 1.01 Transform              <---- #003
++++++ 1.01 Write - SLOW         <---- #004
++++ 1.01 Transform - push:1
++++ 1.01 Transform - next:0
...
++++ 2.01 Transform - next:0     <---- #005
++ 3 Read                        <---- #006
++ 3.01 Read Batch push:0
...
++ 3.05 Read Batch push:0
++++ 2.02 Transform              <---- #007
++++ 2.03 Transform
++ 4 Read                        <---- #008
++ 4.01 Read Batch push:0
...
++ 4.05 Read Batch push:0
------ 1.01 - 1:1 Write finished <---- #009
++++++ 1.02 Write
...
------ 1.04 - 1:4 Write finished
++++ 2.04 Transform
++++++ 1.05 Writ++++ 2.05 Transform
++++ 3.01 Transform
++++ 3.02 Transform
------ 1.05 - 1:5 Write finished
...
------ 2.03 - 1:8 Write finished
++++ 3.03 Transform
++++++ 2.04 Write
++++ 3.04 Transform
++++ 3.05 Transform
++++ 4.01 Transform
++ 5 Read
++++++++++ READ End - Wed Mar 07 2018 08:06:17 GMT+0000 (STD)
++++ 4.01 Transform - next:0
------ 2.04 - 1:9 Write finished
...
++++ 4.05 Transform
++++++++++ TRANSFORM Final - Wed Mar 07 2018 08:06:19 GMT+0000 (STD)
------ 3.03 - 1:13 Write finished
...
------ 4.05 - 1:20 Write finished
++++++++++ WRITE Final - Wed Mar 07 2018 08:06:28 GMT+0000 (STD)


#001
When the program starts a read event is fired causing the first batch of 5 messages to be sent into the read buffer, 4 of these immediately go into the transform streams read buffer and the next batch of 5 is read #002. Now 7 messages are in the read buffer.
#003
Next the transform stream processes one message which flows immediately into the slow write operation #004.
#005
Then 5 messages flow through the transform stream filling the write buffer and the partly filling the transform stream outgoing buffer (remember the buffers have a high watermark set to 4. So now we have 4 messages in the write stream (of which one is currently processing) and 1 in the transform stream out buffer. In the meantime all the remaining 4 messages in the read buffer have now moved to the transform streams read buffer. The outgoing buffer in the read stream is now empty.
#006
Another 5 messages are sent into the read streams buffer
#007
Another 2 go through the transform stream. This now makes 4 messages in the write stream, 4 in the out buffer of the transform stream, 4 messages in the input buffer of the transform stream and 3 in the read streams buffer making 15 messages
#008
Given there the writer is taking a long time and all buffers are almost full the only thing left to do is fill the read streams outgoing buffer, so another batch of 5 are pulled in making 8 in the read stream buffer. There are now 20 messages in the system but not one has been written yet. And remember the high watermarks are set to 4, there are 4 buffers so technically room for 16 messages.
#009
Finally the slow write finishes and so messages flow through the system. The write buffer is the first to be emptied one message at a time, only then do messages move between the queues.

I will leave it as an exercise for the reader to walk through the remaining log but do note the position of the 'READ End' log message. It is a long way down the list. Only after all the messages have been drained from the read queue. Message 4.01 is the 16th message so the transform read buffer now has 4 messages in it. The read stream buffer is empty and so it tries to read again but the code sends in a null message indicating there is nothing left to read and so the read stream ends.

Demo - Asynchronous writing

Alright lets finally get to the point, how do we make our synchronous stream processing faster? Well for this exercise we have a slow write so let's look at parallelising that by allowing multiple messages to be processed at any one time.

Execute: node 05_indexAsync.js
Example results: ./results/05_indexAsync.log

In this code base we are going to send 20 messages through the system, the buffers are all set to 2 but the maximum concurrency of the write operations is set to 10. and if you look at the code inside the file writeItAsync.js you will see that for every write that is performed a callback is setup on a given interval. This callback ultimately calls nextCallBack() which makes a decision based on a closure on a nextUsed flag and the maxConcurrency value as to whether or not next should be called yet. Next is only called if there are currently less than 10 callbacks waiting on the event loop.

Once there are 10 on the loop only the completion of the last callback will result in the unblocking of the next loop. This is because for any given processing of a _write you can only call next() once.

I appreciate that this can look a bit complicated in writing, let's look at the log.

++ 1 Read push:1                                            <---- #001
++ 2 Read push:0
++++ 1 Transform
++++++ 1 Write
      1 - 1:0 Write, Calling next()
++ 3 Read push:0
++++ 2 Transform
...
++++ 9 Transform
++++++ 9 Write
      9 - 9:0 Write, Calling next()
++ 11 Read push:0
++++ 10 Transform
++++++ 10 Write
      10 - 10:0 Write, Max concurrency reached             <---- #002
++ 12 Read push:0
++++ 11 Transform
++ 13 Read push:0
++++ 12 Transform
++ 14 Read push:0
++++ 13 Transform
++ 15 Read push:0
++ 16 Read push:0
++ 17 Read push:0                                           <---- #003
------ 3 - 9:1 Write finished in 176                        <---- #004
      3 - 9:1 Write, Next used
...                                                         <---- #005
------ 10 - 5:5 Write finished in 660
      10 - 5:5 Write, Calling next()                       <---- #006
++++++ 11 Write
      11 - 6:5 Write, Calling next()
++++ 14 Transform
++++++ 12 Write
      12 - 7:5 Write, Calling next()
++++ 15 Transform
++ 18 Read push:0
++ 19 Read push:0
++++++ 13 Write - SLOW
      13 - 8:5 Write, Calling next()
++++ 16 Transform
++++++ 14 Write
      14 - 9:5 Write, Calling next()
++++ 17 Transform
++ 20 Read push:0
++++++++++ READ End - Thu Mar 08 2018 08:40:25 GMT+0000 (STD)
++++++ 15 Write
      15 - 10:5 Write, Max concurrency reached            <---- #007
++++ 18 Transform
------ 7 - 9:6 Write finished in 699                       <---- #008
...
      14 - 3:12 Write, Next used
------ 15 - 2:13 Write finished in 808                     <---- #009
      15 - 2:13 Write, Calling next()
++++++ 16 Write
      16 - 3:13 Write, Calling next()
...
++++++++++ TRANSFORM Final - Thu Mar 08 2018 08:40:26 GMT+0000 (STD)
++++++++++ WRITE Final - Thu Mar 08 2018 08:40:26 GMT+0000 (STD)
------ 6 - 6:14 Write finished in 1518
...


#001 - #002
In the first part of the log messages are read from the queue and flow straight through the transformer into the writer. 10 messages make this journey before the max concurrency limit in the writer is hit at which point we have 10 messages in the writer all being processed.

#002 - #003
Message 11 will flow into the write buffers incoming buffer (message 10 is the one currently being 'tracked' by the writer, its message 10 that is occupying the other position in the buffer).
Messages 12 and 13 go through the transformer and sit in the outgoing queue of the transform stream.
Messages  14 and 15 move into the transform stream incoming buffer and finally messages 16 and 17 are in the outgoing buffer of the read stream.

#004
The process now waits until a write completes, in this case it could be any of the 10 messages currently being processed but it it is message 3 that finishes first. Notice that this message being completed does not cause any movement in the buffers, and does not cause a new message to be pulled into the _write method of the write stream.

#005 - #006
In this particular run 4 messages complete before message 10 does leaving 5 in the event loop (check the full logs for details). This is because of the random processing time in the writer async processing. When message 10 finishes the next function is called #006, remember that the processing of each message can only call next once so only on completion of the last message into the queue can next be called which causes more messages to be pulled into the _write function.

#007
Between #006 and #007 5 more messages are pulled through the buffers and sent for processing in the write stream. You can see messages 14, 15, 16 and 17 are pulled through the transform stream. Messages 10, 11, 12, 13, 14, 15 are now processing in the write stream. Messages 18, 19 and 20 are all read from the read stream.

At this point we have the following situation (you will need to look at the full logs to see this in detail):
  • Finished processing: 3, 1, 5, 9, 10
  • Currently Processing in the Write stream: 2, 4, 6, 7, 8, 11, 12, 13, 14, 15
  • Write stream incoming buffer: 16
  • Transform stream outgoing buffer: 17, 18
  • Transform stream incoming buffer: 19, 20
  • Read stream outgoing buffer: <empty>
This can be seen by the 2 logs for 17 and 18 showing that they have gone through the transform stream

#008
We can see its message 7 that completes next but again this does not cause more messages to move, we need to wait for message 15 to complete for next to be called as that is the last into the write stream.

#008 - #009
8 messages then finish processing with message 15 being the last. You can see that when message 15 completes next() is called causing more messages to go into the write stream for processing.

Analysis

This is better than the other situations, we have multiple messages being processed at the same time here, but there is a strange pulsing to the messages going through the writer. Only when the last message into the write completes do more messages start to be processed by the writer. Using this technique we will have 5 or fewer messages currently being processed, but if that last message into the writer takes a long time to complete the overall throughput will slow until that message completes.

This method was the best I could manage whilst still using the class based stream API to pipe messages into the slow writer. But is there another way? Maybe yes there is.

Demo - Event emitters in the writer

The problem with the previous async model was that it only allowed a certain number of messages to be pulled in to the writer from the transformer, and only when the last message completes can more messages be pulled in. This pull system has limitations as we have seen. There is another way, rather than pull messages, let them be pushed as they become available.

This leads us to utilise the built in capabilities of event emitters. We will listen to the 'data' event of the transformer, then kick off an async event every time data is available. But we only want to process so much at once otherwise if the reader is slower than the writer (which it is) resource consumption will go through the roof and eventually overwhelm the limits on the process (ask us how we know). To do this we will pause and resume the transform stream as the limits inside the writer are met.

Execute: node 06_indexEventEmitters.js
Example results: ./results/06_indexEventEmitters.log

++ 1 Read
++ 2 Read
++++ 1 Transform
++++++ 1 Write
           1:0 Write resumed             <---- #001
++ 3 Read
...
++ 11 Read
++++ 10 Transform
++++++ 10 Write
           10:0 Write pausing            <---- #002
++ 12 Read
++++ 11 Transform
++ 13 Read
++++ 12 Transform
++ 14 Read
++ 15 Read
++ 16 Read                                <---- #003
------ 6 - 9:1 Write finished in 150, resumed
++++ 13 Transform
++++++ 11 Write
           10:1 Write pausing
------ 8 - 9:2 Write finished in 209, resumed
++++ 14 Transform
++ 17 Read
++ 18 Read
++++++ 12 Write
           10:2 Write pausing
------ 10 - 9:3 Write finished in 335, resumed
...
++++++ 17 Write - SLOW
           10:7 Write pausing
------ 11 - 9:8 Write finished in 624, resumed
++++ 20 Transform
++++++ 18 Write
           10:8 Write pausing
++++++++++ TRANSFORM Final - Thu Mar 15 2018 17:33:44
------ 1 - 9:9 Write finished in 840, resumed
++++++ 19 Write
           10:9 Write pausing
------ 3 - 9:10 Write finished in 842, resumed
++++++ 20 Write
           10:10 Write pausing
++++++++++ WRITE Final - Thu Mar 15 2018 17:33:44
------ 5 - 9:11 Write finished in 889, resumed  <---- #004
------ 15 - 8:12 Write finished in 420, resumed
...
------ 14 - 1:19 Write finished in 1405, resumed
------ 17 - 0:20 Write finished in 1758, resumed


#001
Every time we see 'write resumed' or 'write paused' there are two numbers preceding it. This is the number currently in progress and the number finished processing. If the number processing is less than 10 (the concurrency) then resume is called on the transform stream making sure that more messages are pushed into the writer. You can see looking through the logs that the number 'in progress' (the first number) climbs to ten then oscillates between nine and ten until there are no more messages in the transform queue (#004) at which point it start to drop to zero when it is finished processing all the messages.

#002 - #003
There are now ten messages being processed by the writer. Another 6 are pulled in to fill the 3 buffers (2 messages in each of the read buffer, incoming and outgoing transform stream buffers)

#003
From here on messages are pushed into the writer as it pause and resumes the transform stream. The number of messages in process in the async write process will remain as nine or ten for the remainder of the run until all messages have been pushed through the system.

Code

If you were to analyse the code and the differences between 05_indexAsync and 06_indexEventEmitters you will see that 06 does not have a class as a write stream, rather it uses event emitters and listens to these events coming from the transform stream. For each message it will kick off an async processing block.

The effect of this model is that the completion of any write message can cause more messages to flow into the write functions, where as in the previous code only the last message into the write stream could do that.

Comparing the relative speed of the different approaches

I've done some comparisons for the different approaches I have gone through above. Each test processes messages with a high watermark set to 10. I've varied the number of messages processed and the maximum write concurrency to give some comparison.



Run #File nameAsyncTime taken# messagesWrite concurrency
3.103_indexSyncSlowN69s1001
4.104_indexBatchReadN68s1001
5.105_indexAsyncN67s1001
5.205_indexAsyncY10s10010
5.305_indexAsyncY95s100010
5.405_indexAsyncY14s1000100
6.106_indexEventEmittersN68s1001
6.206_indexEventEmittersY8s10010
6.306_indexEventEmittersY72s100010
6.406_indexEventEmittersY9s1000100

Clearly the async versions were going to be quicker than the synchronous versions and it's interesting to see that when write concurrency is set to 1 pretty much all of them take the same 68 seconds. I'm attributing the deviation to the randomised wait time within the writers.

The results that interest me the most though are the comparisons between 5.3 and 6.3 asynchronously processing 1000 messages with a concurrency of 10, and then 5.4 and 6.4 processing 1000 messages with a concurrency of 100. Using event emitters (6.3 and 6.4) were 25% and then 35% quicker than using stream pipes (5.3 and 5.4). Again this is not too surprising given the concurrency in the event emitter version is maintained at 100 for the length of the entire run where as in the piped version the number of messages being processed by the writer only gets topped up when the last message in completes.

If you run the same tests yourself pay attention to the logs of 05_indexAsync which show the number of messages currently being processed, and see how it pulses up and down as the last message finishes (especially when the concurrency is high, try 100 or more).

Conclusion

Like I said at the beginning of this article, I didn't really set out to prove anything with this exercise, more just to get a fuller understanding of how the mechanics of streams work in node. I did this with a view to getting better performance out of the node processes that we employ to process messages off queues, perform business logic on that data and load data in to database. During this investigation/journey there have been lots of learnings, these have proved that we were using streams correctly but inefficiently. We operate our server side processing on a model similar to 04_indexBatchRead batching the read from sqs but processing messages in the writer one at a time. I've tried to capture these learnings in code on the github repo found at https://github.com/DamianStanger/streams101

I hope that I managed to impart some of my learnings on to you in the process.

Appendix / references

My code and results - https://github.com/DamianStanger/streams101
Node stream API - https://nodejs.org/dist/latest-v9.x/docs/api/stream.html
Back pressure - https://nodejs.org/en/docs/guides/backpressuring-in-streams/
Lifecycle of a pipe - https://nodejs.org/en/docs/guides/backpressuring-in-streams/#lifecycle-of-pipe
Node event emitters - https://nodejs.org/dist/latest-v9.x/docs/api/events.html

Footnote

I would really appreciate any feedback on this article, positive comments and constructive feedback are equally welcome as I'm still learning, and would hate it if there are conclusions drawn that are inaccurate or just plain wrong. Thanks in advance. Do try out he code examples it will help cement the concepts described in this article.