Wednesday 8 November 2017

Help! Mysterious read stream behaviour sync vs async.

Hello. I have been playing with implementing my own readable streams recently, usually for reading an API until some condition is met or the like.My problem is that I seem to have stumbled on two different behaviours depending on whether the _read() method is synchronous and async.In the docs it says: When readable._read() is called, if data is available from the resource, the implementation should begin pushing that data into the read queue using the this.push(dataChunk) method. _read() should continue reading from the resource and pushing data until readable.push() returns false. Only when _read() is called again after it has stopped should it resume pushing additional data onto the queue.Note: Once the readable._read() method has been called, it will not be called again until the readable.push() method is called.Which makes me think we should push data until the buffer is full. with is a synchronous implementation this seems to work fine.Suppose we implement a non stop stream of integers. (I wont include the whole class boilerplate just the _read implementations)// this.i equals 0_read() { let ok = true; while(ok) { ok = this.push(this.i); this.i += 1; } } Then this would work as expected. The control flow stays inside the while loop. However suppose we were getting these numbers asynchronously,_read() { Promise.resolve() .then(() => { let ok = true; while(ok) { this.push(this.i); this.i += 1; } }); } as soon as this.push is called the another _read() is called and another promise is created before we even increment our this.i Even worse, suppose there was an initial push..._read() { this.push('first'); Promise.resolve() .then(() => this.push('second')); } "second" will never be pushed. This is understandable if you assume the this.push itself call an internal this._read()... However that is not what is happening in the synchronous version, the function will end before a new one is called.In the end the rule I have made for myself to get reliable asynchronous read streams is to only allow one push per _read() and make that push the last operation of that read. if there are any state changes to make do it before pushing, or better yet abstract state away inside a closure like inside a generator.However this behaviour seems a little erratic, and if anyone who understands stream internals can explain this away I will be very thankful. Thanks.(ps. I know this are not real world examples, but only there to demonstrate control flow, or for anyone who wants to try)

Submitted November 09, 2017 at 02:39AM by davidmdm

No comments:

Post a Comment