Sunday 25 March 2018

Understanding the node stream API, through examples and practice

We utilise node streams extensively in my day job. Making extensive use of the competing consumers pattern to process messages from AWS queues. We pull messages off SQS, then using the node stream API we process the messages through various stages, until, we write to a mongo DB and/or issue events that land on other SQS queues usually via SNS.

I've been looking into how we gain more performance out of our existing codebase, looking into what can be tweaked, and what needs rewriting, its forced me to become quite intimate with nodes stream API, how buffers really work and what effect the highWaterMark has on them. I've also looked at how we could use event emitters rather than streams to make better use of nodes asynchronous nature.

In this post I aim to impart some of my recent knowledge, for my own future reference but also because, having polled many devs on the subject, there are a number of misconceptions about how streams in node actually work.

Analogies

Analogies are useful. I have always thought of streams as a pipe that data flows through, like a hosepipe with water. As with most analogies it is useful but will only take you so far.

__-0_
_____==========|====================|==========
Tap   Reader       Transformer        Writer


A hose has a tap connected at one end (a reader), some length of pipe in the middle (transformers) and an opening at the other to let the water out of the pipe (a writer). A hosepipe will fill with water when you turn on the tap until it is full with water and then will start to issue water from the open end. If a real tap is switched off the water stops flowing at the open end with water still in the tap. So imagine a hose that is dangling vertically, still attached to the tap but if the water is switched off the water in the pipe will all drain out due to gravity, this is more like a stream in node.

Buffers

__-0_                      _
_____==========|==========|x|==========|==========
Tap   Reader         Transformer        Writer


But the water in the hose is not all being processed at the same time. Imagine 3 parts to the hose all connected together, each one capable of holding a certain amount of water, this is analogous to buffers. In the centre of the middle pipe (the transformer) you have a box, a unit of water flows into the box, then out, but only one unit of water can pass through at once. This is in fact how a transform stream works. Take a compression stream for example, one piece of data goes in, its compressed and the compressed data comes out ready to be written by the writer.

So you now have 4 sections of pipe, each capable of containing some water, these are the buffers. The reader has a buffer on its outgoing side, the transformer has 2 buffers, one read buffer and one write buffer, and the writer has one incoming buffer. If the highWatermark is set to 5 for all streams you have room for 20 units of water (this is not the whole story as I will explain later but like I said it’s a useful analogy).

The streams highWaterMark setting governs the desired amount of data to be in the buffer at any one time. It is not a limit as we will show later but more a guide to tell the stream when to pause the streaming of data into the buffer.

Classes and functions of the stream API


 _                       _                       _
|R|==========|==========|T|==========|==========|W|
    Reader         Transformer         Writer


When you implement streams you need to implement a function in each of the Reader, Transformer and the Writer. The function names are quite self-explanatory _read, _transform and _write respectively. These functions are called when data flow through the pipe, it all happens once the streams are piped together thus:

readStream.pipe(transformStream).pipe(writeStream);

Once the streams are pipped together any data flowing into the read stream will be pulled down the pipe by the stream API which will handle any buffering needed but what is key right now is that only one piece of information can flow through any given function at once, they are 'NOT' asynchronous out of the box. This makes sense as you would not want a transform stream that is compressing or encrypting data to process multiple chunks simultaneously and potentially get the zipped chunks mixed up in the write buffer.

Now because there can be only one piece of data flowing through the functions at once the buffers will fill up if the rate of data coming into the pipe is greater than the rate of data flowing out of the pipe. Usually this is true, for example reading from a file is generally quicker than writing to another.

Data flow


If we look at what happens when a single message is inserted into the pipe you will see it do this
 _                     _                    _
|1|=========|=========| |=========|========| |
 _                     _                    _
| |=========|=========|1|=========|========| |
 _                     _                    _
| |=========|=========| |=========|========|1|


If the write is slow and 1 more message was to be read this is what would happen (for the sake of this example the buffer (highWaterMark) is set to 1 for all streams).

 _                     _                   _
| |====-====|====-====| |====2====|===x===|1|


Notice how both the writers buffer and the transformers write buffer are both full (the writer as a whole can only have one message within it, the buffer was full but it's now processing that message). If a 3rd message was to go down the pipe it would get stuck in the transformers read buffer, it has nowhere to go so will not go into the _transform function until there is room in the next buffer.

 _                     _                   _
| |====-====|====3====| |====2====|===x===|1|


Finally if the reader pumped a 4th message into the pipe it would go into the readers buffer.

 _                     _                   _
| |====4====|====3====| |====2====|===x===|1|


In this state nothing can flow down the pipe until the writer is finished processing message number one. This is called backpressure, messages back up behind the slowest stream (in this case the writer but it could equally be a transformer).

Backpressure, drain and resume


That was a bit of a simplistic example, let's say the buffers are set to a high watermark of 2 rather than one. A slow write would result in the following situation (again note that the write stream can only have 2 messages in it as a whole, with only one in its buffer).

 _                       _                      _
| |===8==7===|===6==5===| |===4==3===|====2====|1|


At this point technically speaking the transform stream and the read stream are paused and will not resume until their buffers are emptied when a drain event occurs. Let's see what happens when the writer finishes writing [1]

 _                       _                      _
| |===8==7===|===6==5===| |===4==3===|====-====|2|


Notice that more data is not pulled from the transform write buffer yet.

Only when [2] finishes processing will a drain event on the write stream occur causing it fill again. There is enough room for 2 pieces of data in the writer, 1 in the buffer and one in the slow write operation. So as soon as [2] finishes being written 3 will be pulled into the write stream

 _                       _                      _
| |===8==7===|===6==5===| |===-==4===|====3====| |


There is now room in the transforms write buffer so 5 will get pulled through the transform

 _                       _                      _
| |===8==7===|===-==6===|5|===-==4===|====3====| |


3 will then go into the writer and 4 will go into the writers buffer
 _                       _                      _
| |===8==7===|===-==6===|5|===-==-===|====4====|3|


When 5 finishes its transform 6 will also go through the transform causing the transforms read buffer to drain
 _                       _                      _
| |===8==7===|===-==-===| |===6==5===|====4====|3|

7 and 8 will then flow into the transforms read buffer
 _                       _                      _
| |===-==-===|===8==7===| |===6==5===|====4====|3|

This causes the readers buffer to drain which will then pull 2 more messages in through the reader
 _                       _                      _
| |==10==9===|===8==7===| |===6==5===|====4====|3|

And we are back to the state we had originally, we are now waiting for the slow write operation to occur on message #3

Buffering

So what you can see from the above example is a pulsing action when messages sort of get pulled through in pairs (the current high watermark) once 2 slow writes have occurred. This is due to the way that the backpressure gets released and the buffers fully empty before fully filling up again. Obviously this gets more complex if you have transformers that are also slow but you will always see messages flowing through the buffers in pairs (when all the streams high watermarks are set to 2).


Some demos

Firstly I'd like to introduce you to the repo that contains the examples. You can see there are a number or index files each one is designed to be run with node indexFileName.js and will give a coloured output to the terminal.

https://github.com/DamianStanger/streams101

I aim to produce and demo some simple code that you can then play with, change the high watermark settings and the numbers of messages with a view to allowing you to gain a better understanding of how streams work yourselves.

Object mode

First a quick note that all the examples given below use streams in object mode. This is because in my case I'm investigating how I can get more performance out of my streams when pulling messages off an SQS queue. So to use object mode makes sense but all the concepts will be similar if you're dealing with chunked streams. The exception here is writing in an asynchronous manner, for example you would not want to write multiple chunks to a file stream asynchronously as your chunks would get all mixed up in the resultant file.

Demo - Fast reading, fast writing

Execute: node 01_indexSync.js
Example results: ./results/01_indexSync.log

The firstly simple demo that shows what happens when the reader and the writer are not IO bound, they are totally synchronous and there are no callbacks or promises involved. The results can be seen in indexSync.log.

Even though the high watermark is set to 2 in each stream there is only ever one message in any of the streams at once. This is because the processing of the message within each stream is CPU bound and so gives the event loop no time to do some other work, including pulling the next message into a buffer.

Notice that it’s the reader that ends the process by sending a null message through the system when it is finished the set number of reads. If you were to comment the push(null) statement out, then the transform and write streams will not know when to finish and so wont execute the _final method.

Demo - Back pressure

Execute: node 02_indexBackPressure.js
Example results: ./results/02_indexBackPressure.log

Here the writer has not been wired into the pipe and so this gives the opportunity to see what occurs when the reader reads until all the buffers are full. The high watermark is set to 5 for each stream so you can see that 15 messages are read, 5 of which are transformed to fill the write buffer of the transform stream. 5 will be stuck in the read buffer of the transform stream and 5 will be in the outgoing buffer of the read stream. At this point all the streams are paused and since there is no writer to trigger a drain then there are no more callbacks to process and the process ends.

A selection of the log is shown below with a little removed to make for a clearer example, see the full log in the repo. Notice how on lines 23 and 33 a zero is returned indicating that the stream is going to pause.

++ 1 Read        <----+
++ 2 Read             |
++++ 1 Transform      |
++ 3 Read             | These 5 messages went straight
++++ 2 Transform      | through the _transform method
++ 4 Read             | and into the write buffer of
++++ 3 Transform      | the transform stream
++ 5 Read             |
++++ 4 Transform      |
++ 6 Read          <----+
++++ 5 Transform <----+ |
++ 7 Read               |
++ 8 Read               | These 5 messages are in the
++ 9 Read               | Transform streams read buffer
++ 10 Read       <------+
++ 11 Read       <----+
++ 12 Read            | These messages are in the
++ 13 Read            | read streams outgoing buffer
++ 14 Read            |
++ 15 Read       <----+


Demo - Slow writing

Execute: node 03_indexSyncSlow.js
Example results: ./results/03_indexSyncSlow.log

Here I have added a slow write operation to the writer, this now operates in the same manner as the walk through I did above titled 'Backpressure drain and resume'. The highwatermark is set to 2 for all streams, and we are processing 10 messages. The results of running this can be seen here (again I've removed some log lines for brevity).

++++++++++ READ Start - Tue Mar 06 2018 18:00:46 GMT+0000 (STD)
++ 1 Read
++ 2 Read
++++ 1 Transform
++++++ 1 Write   <-- 1st message goes into the _write method and starts slow processing
++ 3 Read
++++ 2 Transform <-- 2nd message goes into the write streams buffer
++ 4 Read
++++ 3 Transform <-- 3rd message goes into the transforms stream write buffer
++ 5 Read        <-- 5th message goes into the transform stream read buffer
++++ 4 Transform <-- 4th message goes into the transforms stream write buffer
++ 6 Read        <-- 6th message goes into the transform stream read buffer
++ 7 Read               <-- Stuck in outgoing the read stream buffer
++ 8 Read               <-- Stuck in outgoing the read stream buffer

 At this point all the buffers are full until the writer finishes

------ 1 Write finished
++++++ 2 Write
------ 2 Write finished <-- Write stream is now empty, there is room for 2
                            messages in its buffer. Calling drain will result
                            in 2 messages pushed in from the transform buffer.
++++ 5 Transform        <-- The transform streams write buffer is now empty, so
                            a drain event is called causing 2 messages to be pulled
                            through the _transform method
++++++ 3 Write
++++ 6 Transform <-- Both the messages that were in the transform stream read buffer have
                     been processed, drain causes the 2 messages in the read stream
                     buffer to be pulled into the transform read buffer.
++ 9 Read
++ 10 Read
------ 3 Write finished <-- From here the whole process will start again
++++++ 4 Write
------ 4 Write finished
...
...


Demo - Batch reading

Execute: node 04_indexBatchRead.js
Example results: ./results/04_indexBatchRead.log

I've been implying that the buffers in the streams are finite in nature, that they can only hold a given number of messages, this is not correct and is where the hosepipe analogy starts to breakdown. In the example 03_ a high watermark of 2 was used, when the buffer becomes full any message pushed into the stream will result in a result of false being issued back to the origin that is pushing data into the stream, this back pressure resulted in exactly the 'correct' number of messages in each of the buffers.

When you run this example you will see a long output that I have include below with many parts cut out. The interesting parts are denoted with a number and referenced below:

++ 1 Read                        <---- #001
++ 1.01 Read Batch push:1
++ 1.02 Read Batch push:1
++ 1.03 Read Batch push:1
++ 1.04 Read Batch push:0
++ 1.05 Read Batch push:0
++ 2 Read                        <---- #002
++ 2.01 Read Batch push:0
...
++ 2.05 Read Batch push:0
++++ 1.01 Transform              <---- #003
++++++ 1.01 Write - SLOW         <---- #004
++++ 1.01 Transform - push:1
++++ 1.01 Transform - next:0
...
++++ 2.01 Transform - next:0     <---- #005
++ 3 Read                        <---- #006
++ 3.01 Read Batch push:0
...
++ 3.05 Read Batch push:0
++++ 2.02 Transform              <---- #007
++++ 2.03 Transform
++ 4 Read                        <---- #008
++ 4.01 Read Batch push:0
...
++ 4.05 Read Batch push:0
------ 1.01 - 1:1 Write finished <---- #009
++++++ 1.02 Write
...
------ 1.04 - 1:4 Write finished
++++ 2.04 Transform
++++++ 1.05 Writ++++ 2.05 Transform
++++ 3.01 Transform
++++ 3.02 Transform
------ 1.05 - 1:5 Write finished
...
------ 2.03 - 1:8 Write finished
++++ 3.03 Transform
++++++ 2.04 Write
++++ 3.04 Transform
++++ 3.05 Transform
++++ 4.01 Transform
++ 5 Read
++++++++++ READ End - Wed Mar 07 2018 08:06:17 GMT+0000 (STD)
++++ 4.01 Transform - next:0
------ 2.04 - 1:9 Write finished
...
++++ 4.05 Transform
++++++++++ TRANSFORM Final - Wed Mar 07 2018 08:06:19 GMT+0000 (STD)
------ 3.03 - 1:13 Write finished
...
------ 4.05 - 1:20 Write finished
++++++++++ WRITE Final - Wed Mar 07 2018 08:06:28 GMT+0000 (STD)


#001
When the program starts a read event is fired causing the first batch of 5 messages to be sent into the read buffer, 4 of these immediately go into the transform streams read buffer and the next batch of 5 is read #002. Now 7 messages are in the read buffer.
#003
Next the transform stream processes one message which flows immediately into the slow write operation #004.
#005
Then 5 messages flow through the transform stream filling the write buffer and the partly filling the transform stream outgoing buffer (remember the buffers have a high watermark set to 4. So now we have 4 messages in the write stream (of which one is currently processing) and 1 in the transform stream out buffer. In the meantime all the remaining 4 messages in the read buffer have now moved to the transform streams read buffer. The outgoing buffer in the read stream is now empty.
#006
Another 5 messages are sent into the read streams buffer
#007
Another 2 go through the transform stream. This now makes 4 messages in the write stream, 4 in the out buffer of the transform stream, 4 messages in the input buffer of the transform stream and 3 in the read streams buffer making 15 messages
#008
Given there the writer is taking a long time and all buffers are almost full the only thing left to do is fill the read streams outgoing buffer, so another batch of 5 are pulled in making 8 in the read stream buffer. There are now 20 messages in the system but not one has been written yet. And remember the high watermarks are set to 4, there are 4 buffers so technically room for 16 messages.
#009
Finally the slow write finishes and so messages flow through the system. The write buffer is the first to be emptied one message at a time, only then do messages move between the queues.

I will leave it as an exercise for the reader to walk through the remaining log but do note the position of the 'READ End' log message. It is a long way down the list. Only after all the messages have been drained from the read queue. Message 4.01 is the 16th message so the transform read buffer now has 4 messages in it. The read stream buffer is empty and so it tries to read again but the code sends in a null message indicating there is nothing left to read and so the read stream ends.

Demo - Asynchronous writing

Alright lets finally get to the point, how do we make our synchronous stream processing faster? Well for this exercise we have a slow write so let's look at parallelising that by allowing multiple messages to be processed at any one time.

Execute: node 05_indexAsync.js
Example results: ./results/05_indexAsync.log

In this code base we are going to send 20 messages through the system, the buffers are all set to 2 but the maximum concurrency of the write operations is set to 10. and if you look at the code inside the file writeItAsync.js you will see that for every write that is performed a callback is setup on a given interval. This callback ultimately calls nextCallBack() which makes a decision based on a closure on a nextUsed flag and the maxConcurrency value as to whether or not next should be called yet. Next is only called if there are currently less than 10 callbacks waiting on the event loop.

Once there are 10 on the loop only the completion of the last callback will result in the unblocking of the next loop. This is because for any given processing of a _write you can only call next() once.

I appreciate that this can look a bit complicated in writing, let's look at the log.

++ 1 Read push:1                                            <---- #001
++ 2 Read push:0
++++ 1 Transform
++++++ 1 Write
      1 - 1:0 Write, Calling next()
++ 3 Read push:0
++++ 2 Transform
...
++++ 9 Transform
++++++ 9 Write
      9 - 9:0 Write, Calling next()
++ 11 Read push:0
++++ 10 Transform
++++++ 10 Write
      10 - 10:0 Write, Max concurrency reached             <---- #002
++ 12 Read push:0
++++ 11 Transform
++ 13 Read push:0
++++ 12 Transform
++ 14 Read push:0
++++ 13 Transform
++ 15 Read push:0
++ 16 Read push:0
++ 17 Read push:0                                           <---- #003
------ 3 - 9:1 Write finished in 176                        <---- #004
      3 - 9:1 Write, Next used
...                                                         <---- #005
------ 10 - 5:5 Write finished in 660
      10 - 5:5 Write, Calling next()                       <---- #006
++++++ 11 Write
      11 - 6:5 Write, Calling next()
++++ 14 Transform
++++++ 12 Write
      12 - 7:5 Write, Calling next()
++++ 15 Transform
++ 18 Read push:0
++ 19 Read push:0
++++++ 13 Write - SLOW
      13 - 8:5 Write, Calling next()
++++ 16 Transform
++++++ 14 Write
      14 - 9:5 Write, Calling next()
++++ 17 Transform
++ 20 Read push:0
++++++++++ READ End - Thu Mar 08 2018 08:40:25 GMT+0000 (STD)
++++++ 15 Write
      15 - 10:5 Write, Max concurrency reached            <---- #007
++++ 18 Transform
------ 7 - 9:6 Write finished in 699                       <---- #008
...
      14 - 3:12 Write, Next used
------ 15 - 2:13 Write finished in 808                     <---- #009
      15 - 2:13 Write, Calling next()
++++++ 16 Write
      16 - 3:13 Write, Calling next()
...
++++++++++ TRANSFORM Final - Thu Mar 08 2018 08:40:26 GMT+0000 (STD)
++++++++++ WRITE Final - Thu Mar 08 2018 08:40:26 GMT+0000 (STD)
------ 6 - 6:14 Write finished in 1518
...


#001 - #002
In the first part of the log messages are read from the queue and flow straight through the transformer into the writer. 10 messages make this journey before the max concurrency limit in the writer is hit at which point we have 10 messages in the writer all being processed.

#002 - #003
Message 11 will flow into the write buffers incoming buffer (message 10 is the one currently being 'tracked' by the writer, its message 10 that is occupying the other position in the buffer).
Messages 12 and 13 go through the transformer and sit in the outgoing queue of the transform stream.
Messages  14 and 15 move into the transform stream incoming buffer and finally messages 16 and 17 are in the outgoing buffer of the read stream.

#004
The process now waits until a write completes, in this case it could be any of the 10 messages currently being processed but it it is message 3 that finishes first. Notice that this message being completed does not cause any movement in the buffers, and does not cause a new message to be pulled into the _write method of the write stream.

#005 - #006
In this particular run 4 messages complete before message 10 does leaving 5 in the event loop (check the full logs for details). This is because of the random processing time in the writer async processing. When message 10 finishes the next function is called #006, remember that the processing of each message can only call next once so only on completion of the last message into the queue can next be called which causes more messages to be pulled into the _write function.

#007
Between #006 and #007 5 more messages are pulled through the buffers and sent for processing in the write stream. You can see messages 14, 15, 16 and 17 are pulled through the transform stream. Messages 10, 11, 12, 13, 14, 15 are now processing in the write stream. Messages 18, 19 and 20 are all read from the read stream.

At this point we have the following situation (you will need to look at the full logs to see this in detail):
  • Finished processing: 3, 1, 5, 9, 10
  • Currently Processing in the Write stream: 2, 4, 6, 7, 8, 11, 12, 13, 14, 15
  • Write stream incoming buffer: 16
  • Transform stream outgoing buffer: 17, 18
  • Transform stream incoming buffer: 19, 20
  • Read stream outgoing buffer: <empty>
This can be seen by the 2 logs for 17 and 18 showing that they have gone through the transform stream

#008
We can see its message 7 that completes next but again this does not cause more messages to move, we need to wait for message 15 to complete for next to be called as that is the last into the write stream.

#008 - #009
8 messages then finish processing with message 15 being the last. You can see that when message 15 completes next() is called causing more messages to go into the write stream for processing.

Analysis

This is better than the other situations, we have multiple messages being processed at the same time here, but there is a strange pulsing to the messages going through the writer. Only when the last message into the write completes do more messages start to be processed by the writer. Using this technique we will have 5 or fewer messages currently being processed, but if that last message into the writer takes a long time to complete the overall throughput will slow until that message completes.

This method was the best I could manage whilst still using the class based stream API to pipe messages into the slow writer. But is there another way? Maybe yes there is.

Demo - Event emitters in the writer

The problem with the previous async model was that it only allowed a certain number of messages to be pulled in to the writer from the transformer, and only when the last message completes can more messages be pulled in. This pull system has limitations as we have seen. There is another way, rather than pull messages, let them be pushed as they become available.

This leads us to utilise the built in capabilities of event emitters. We will listen to the 'data' event of the transformer, then kick off an async event every time data is available. But we only want to process so much at once otherwise if the reader is slower than the writer (which it is) resource consumption will go through the roof and eventually overwhelm the limits on the process (ask us how we know). To do this we will pause and resume the transform stream as the limits inside the writer are met.

Execute: node 06_indexEventEmitters.js
Example results: ./results/06_indexEventEmitters.log

++ 1 Read
++ 2 Read
++++ 1 Transform
++++++ 1 Write
           1:0 Write resumed             <---- #001
++ 3 Read
...
++ 11 Read
++++ 10 Transform
++++++ 10 Write
           10:0 Write pausing            <---- #002
++ 12 Read
++++ 11 Transform
++ 13 Read
++++ 12 Transform
++ 14 Read
++ 15 Read
++ 16 Read                                <---- #003
------ 6 - 9:1 Write finished in 150, resumed
++++ 13 Transform
++++++ 11 Write
           10:1 Write pausing
------ 8 - 9:2 Write finished in 209, resumed
++++ 14 Transform
++ 17 Read
++ 18 Read
++++++ 12 Write
           10:2 Write pausing
------ 10 - 9:3 Write finished in 335, resumed
...
++++++ 17 Write - SLOW
           10:7 Write pausing
------ 11 - 9:8 Write finished in 624, resumed
++++ 20 Transform
++++++ 18 Write
           10:8 Write pausing
++++++++++ TRANSFORM Final - Thu Mar 15 2018 17:33:44
------ 1 - 9:9 Write finished in 840, resumed
++++++ 19 Write
           10:9 Write pausing
------ 3 - 9:10 Write finished in 842, resumed
++++++ 20 Write
           10:10 Write pausing
++++++++++ WRITE Final - Thu Mar 15 2018 17:33:44
------ 5 - 9:11 Write finished in 889, resumed  <---- #004
------ 15 - 8:12 Write finished in 420, resumed
...
------ 14 - 1:19 Write finished in 1405, resumed
------ 17 - 0:20 Write finished in 1758, resumed


#001
Every time we see 'write resumed' or 'write paused' there are two numbers preceding it. This is the number currently in progress and the number finished processing. If the number processing is less than 10 (the concurrency) then resume is called on the transform stream making sure that more messages are pushed into the writer. You can see looking through the logs that the number 'in progress' (the first number) climbs to ten then oscillates between nine and ten until there are no more messages in the transform queue (#004) at which point it start to drop to zero when it is finished processing all the messages.

#002 - #003
There are now ten messages being processed by the writer. Another 6 are pulled in to fill the 3 buffers (2 messages in each of the read buffer, incoming and outgoing transform stream buffers)

#003
From here on messages are pushed into the writer as it pause and resumes the transform stream. The number of messages in process in the async write process will remain as nine or ten for the remainder of the run until all messages have been pushed through the system.

Code

If you were to analyse the code and the differences between 05_indexAsync and 06_indexEventEmitters you will see that 06 does not have a class as a write stream, rather it uses event emitters and listens to these events coming from the transform stream. For each message it will kick off an async processing block.

The effect of this model is that the completion of any write message can cause more messages to flow into the write functions, where as in the previous code only the last message into the write stream could do that.

Comparing the relative speed of the different approaches

I've done some comparisons for the different approaches I have gone through above. Each test processes messages with a high watermark set to 10. I've varied the number of messages processed and the maximum write concurrency to give some comparison.



Run #File nameAsyncTime taken# messagesWrite concurrency
3.103_indexSyncSlowN69s1001
4.104_indexBatchReadN68s1001
5.105_indexAsyncN67s1001
5.205_indexAsyncY10s10010
5.305_indexAsyncY95s100010
5.405_indexAsyncY14s1000100
6.106_indexEventEmittersN68s1001
6.206_indexEventEmittersY8s10010
6.306_indexEventEmittersY72s100010
6.406_indexEventEmittersY9s1000100

Clearly the async versions were going to be quicker than the synchronous versions and it's interesting to see that when write concurrency is set to 1 pretty much all of them take the same 68 seconds. I'm attributing the deviation to the randomised wait time within the writers.

The results that interest me the most though are the comparisons between 5.3 and 6.3 asynchronously processing 1000 messages with a concurrency of 10, and then 5.4 and 6.4 processing 1000 messages with a concurrency of 100. Using event emitters (6.3 and 6.4) were 25% and then 35% quicker than using stream pipes (5.3 and 5.4). Again this is not too surprising given the concurrency in the event emitter version is maintained at 100 for the length of the entire run where as in the piped version the number of messages being processed by the writer only gets topped up when the last message in completes.

If you run the same tests yourself pay attention to the logs of 05_indexAsync which show the number of messages currently being processed, and see how it pulses up and down as the last message finishes (especially when the concurrency is high, try 100 or more).

Conclusion

Like I said at the beginning of this article, I didn't really set out to prove anything with this exercise, more just to get a fuller understanding of how the mechanics of streams work in node. I did this with a view to getting better performance out of the node processes that we employ to process messages off queues, perform business logic on that data and load data in to database. During this investigation/journey there have been lots of learnings, these have proved that we were using streams correctly but inefficiently. We operate our server side processing on a model similar to 04_indexBatchRead batching the read from sqs but processing messages in the writer one at a time. I've tried to capture these learnings in code on the github repo found at https://github.com/DamianStanger/streams101

I hope that I managed to impart some of my learnings on to you in the process.

Appendix / references

My code and results - https://github.com/DamianStanger/streams101
Node stream API - https://nodejs.org/dist/latest-v9.x/docs/api/stream.html
Back pressure - https://nodejs.org/en/docs/guides/backpressuring-in-streams/
Lifecycle of a pipe - https://nodejs.org/en/docs/guides/backpressuring-in-streams/#lifecycle-of-pipe
Node event emitters - https://nodejs.org/dist/latest-v9.x/docs/api/events.html

Footnote

I would really appreciate any feedback on this article, positive comments and constructive feedback are equally welcome as I'm still learning, and would hate it if there are conclusions drawn that are inaccurate or just plain wrong. Thanks in advance. Do try out he code examples it will help cement the concepts described in this article.

Wednesday 28 February 2018

Converting CRLF to LF for all files in a git repository

At work we currently have people who do their dev on linux laptops, linux VMs, windows and the WSL which means that we need to be careful about the compatibility of files in git. We deploy to centos in all our production and pre-prod environments so we always check-in linux line endings.

But recently when I was looking through some codez I found a bit of a mix of files with LF and CRLF line endings and so wanted make them all consistent with the LF linux standard.

git config


I don’t know how this happened exactly and didn’t narrow it down to any one commit but just wanted it fixed. We should all have our git clients set to convert and check in linux line endings which you can check with the command:

git config --list --global
git config --list


You are looking for the setting core.autocrlf. This can take three values: true, false and input. Depending on the OS that you are using you need to ensure that you use the correct setting.

On windows it should be set to true. That is check out windows style (CRLF) but check in linux style (LF).

On linux it needs to set to false or input as you don’t want files to contain windows line endings during development so you chekout with LF. You can also leave as default which is false.

I make heavy use of WSL (windows subsystem for linux) as well as centos VMs running on virtualbox. WSL behaves like linux so I have the default set which is not to change the line endings going in or out. But you do have to be careful. If you change the files or create files using a windows editor (I use webstorm and sublime) then you could inadvertently check in windows line endings, so it might be best to use input. Input will checkout as is from the repo but on check in will convert all line endings to LF, just in case a CRLF file was introduced.

By the way I love the WSL I use it every day and do prefer it to using a VM running linux, it works great for node dev.

Converting CRLF to LF


Anyway back to the main point of this post. We have some files with windows line endings mixed in with files with linux line endings. How to make them consistent? In particular how to make them all have linux line endings?

The difference is \r\n (windows) vs \n (linux) the tool sed is very good at finding strings in a file and replacing them with another

sed is a stream editor for filtering and transforming text it takes we can make it take a regex replacement and run that against a file to remove any carriage returns from it '\r'

sed -i 's/\r//g' myfilename.js

-i tells sed to do an in place substitution, 's/\r//g' is a regex that searches for carriage return '\r' and replaces them with nothing '//' globally for that file.

But we have hundreds of files across tens of nested directories. So we need to find all the files we want to 'fix' using the find command.

find . -type f -not -path './.git*' -not -path './node_modules*'

This will recursively list all files from the current directory excluding any files in the .git or node_modules folders. Do remember to exclude your .git folders as you will corrupt it if you run the substitution against files in there. Also remove any package folders or binary folders, this depends on the environment you are working in, I'm currently doing node dev so excluding the node_modules is good enough for me.

All that remains is to put them together using the standard unix pipe operator and the xargs command which allows you to build and execute command lines, it will take the output of the find space separate the file names and append them to the next command, we would use it thus:

find . -type f -not -path './.git*' -not -path './node_modules*' | xargs sed -i 's/\r//g'

If the folder contained 2 files xargs would build a command that looked like this:

sed -i 's/\r//g' ./file1.js ./file2.js

Voila!

All CRLF line endings are replaced with LF. You should be able to check this by using git diff to see the changes. You should see all line endings in the unified diff like this:

diff --git a/file1.js b/file1.js
index 01ce825..f5f8e58 100644
--- a/file1.js
+++ b/file1.js
-old line with windows line endings^M
+old line with windows line endings


If you don’t see the ^M but just two lines that look the same then there are a couple of tricks you can try.
git diff -R This reverses the output, apparently git does not always highlight removed white space, but will highlight added white space.
git diff | cat -v This will pipe the raw patch output from the git diff to cat. cat with a -v echoes the input including all non-display characters (like a carriage return) to the console.

Appendix

https://git-scm.com/docs/git-config
https://git-scm.com/docs/git-diff
https://manpages.debian.org/stretch/sed/sed.1.en.html
https://manpages.debian.org/stretch/findutils/xargs.1.en.html
https://manpages.debian.org/stretch/findutils/find.1.en.html

Saturday 27 January 2018

git shortcuts

I've just recently had to set up my git environment again. One thing came to my attention, that I've not got them all written down anywhere. This post is documentation so next time I have to do this I will have them easily to hand.

$git config --global --list

alias.cm=commit -m
alias.co=checkout
alias.d=diff
alias.ds=diff --staged
alias.l=log --oneline --decorate --graph --all
alias.s=status --short


This can be set up by either adding it to your git config ~/.gitconfig
[alias]
  cm = commit -m
  co = checkout
  d  = diff
  ds = diff --staged
  l  = log --oneline --decorate --graph --all
  s  = status --short


Or you can use the console to set the alias'

git config --global alias.cm commit -m
git config --global alias.co checkout
git config --global alias.d  diff
git config --global alias.ds diff --staged
git config --global alias.l  log --oneline --decorate --graph --all
git config --global alias.s  status --short


Also I use ZSH as my console of choice which has many built in shortcuts for practically all git commands. But to be honest I find them a little cryptic even for me. Be my guest to check it out https://github.com/robbyrussell/oh-my-zsh/wiki/Cheatsheet