All Projects → ludiosarchive → Why Are My Node Streams Hanging

ludiosarchive / Why Are My Node Streams Hanging

Some supplemental node streams documentation

Projects that are alternatives of or similar to Why Are My Node Streams Hanging

swoole-futures
⏳ Futures, Streams & Async/Await for PHP's Swoole asynchronous run-time.
Stars: ✭ 100 (+177.78%)
Mutual labels:  streams
Sqlstreamstore
Stream Store library targeting RDBMS based implementations for .NET
Stars: ✭ 374 (+938.89%)
Mutual labels:  streams
Chunk Store Stream
Convert an abstract-chunk-store compliant store into a readable or writable stream
Stars: ✭ 24 (-33.33%)
Mutual labels:  streams
eloquent-ffmpeg
High-level API for FFmpeg's Command Line Tools
Stars: ✭ 71 (+97.22%)
Mutual labels:  streams
Merry
🌊🌊⛵️🌊🌊 - cute streaming API framework
Stars: ✭ 308 (+755.56%)
Mutual labels:  streams
Kafka Streams
equivalent to kafka-streams 🐙 for nodejs ✨🐢🚀✨
Stars: ✭ 613 (+1602.78%)
Mutual labels:  streams
web-streams-polyfill
Web Streams, based on the WHATWG spec reference implementation
Stars: ✭ 198 (+450%)
Mutual labels:  streams
Capture Stream
Capture stream output.
Stars: ✭ 10 (-72.22%)
Mutual labels:  streams
Rpd
👌 A Minimal Engine for creating Node-Based Visual Programming User Interfaces
Stars: ✭ 370 (+927.78%)
Mutual labels:  streams
Sseredis
Redis Streams and PubSub to Server-Sent Event bridge in Go
Stars: ✭ 23 (-36.11%)
Mutual labels:  streams
kerala
Distributed KV Streams
Stars: ✭ 16 (-55.56%)
Mutual labels:  streams
Advanced Nodejs
For help, ask in #questions at slack.jscomplete.com
Stars: ✭ 273 (+658.33%)
Mutual labels:  streams
Go Streams
A lightweight stream processing library for Go
Stars: ✭ 615 (+1608.33%)
Mutual labels:  streams
core
The XP Framework is an all-purpose, object oriented PHP framework.
Stars: ✭ 13 (-63.89%)
Mutual labels:  streams
Frosty
serialize native Nim types to strings, streams, or sockets ⛄
Stars: ✭ 25 (-30.56%)
Mutual labels:  streams
stream-snitch
Event emitter for watching text streams with regex patterns
Stars: ✭ 19 (-47.22%)
Mutual labels:  streams
Execa
Process execution for humans
Stars: ✭ 4,318 (+11894.44%)
Mutual labels:  streams
Node Scalable Blob Store
A file system blob store that is designed to prevent conflicts when used with a distributed file system or storage area network
Stars: ✭ 31 (-13.89%)
Mutual labels:  streams
Remote Web Streams
Web streams that work across web workers and iframes.
Stars: ✭ 26 (-27.78%)
Mutual labels:  streams
Sql Streams
Painless low level jdbc abstraction using the java 8 stream api.
Stars: ✭ 17 (-52.78%)
Mutual labels:  streams

Why are my node streams hanging / not ending / losing data?

This might be happening because:

You're returning a stream to the caller, but you added a 'data' handler to it without also pausing it

Attaching a 'data' handler puts the stream into flowing mode. If the caller doesn't immediately (in the current tick) attach data/end handlers, they'll start losing data and possibly the 'end' event.

Possibly very bad:

	let bytesRead = 0;
	outputStream.on('data', function(data) {
		bytesRead += data.length;
	});
	return outputStream;

Probably right:

	let bytesRead = 0;
	outputStream.on('data', function(data) {
		bytesRead += data.length;
	});
	// We attached a 'data' handler, but don't let that put us into
	// flowing mode yet, because the user hasn't attached their own
	// 'data' handler yet.
	outputStream.pause();
	return outputStream;

You're listening to the wrong 'end' / 'finish' event

'end' fires when a Readable stops reading; 'finish' fires when a Writable stops writing.

https://nodejs.org/api/stream.html#stream_event_end

https://nodejs.org/api/stream.html#stream_event_finish - talks about the "end() method", which has nothing to do with the 'end' event!

You're not handling the 'error' event

An 'error' event can also end a Readable or finish a Writable, so you need to handle it.

Your Transform stream neglects to call callback() in some cases

Your _transform and _flush implementations need to call callback(), optionally with an error or data, in all cases. Not necessarily within the same tick, but it must be called.

Buggy code that results in a stream that sometimes doesn't fire 'end':

	_flush(callback) {
		// Last block might not be full-size, and now that we know we've reached
		// the end, we handle it here.
		if(!this._buf.length) {
			return;
		}
		const crc = calculate(this._buf);
		if(crc !== this._crc) {
			callback(new Error("bad crc"));
		}
		this.push(this._buf);
		callback();
	}

Less-buggy code with the missing line added:

	_flush(callback) {
		// Last block might not be full-size, and now that we know we've reached
		// the end, we handle it here.
		if(!this._buf.length) {
			callback(); // <------------------------
			return;
		}
		const crc = calculate(this._buf);
		if(crc !== this._crc) {
			callback(new Error("bad crc"));
		}
		this.push(this._buf);
		callback();
	}

You're piping an already-ended stream

See the next section for an example.

You're using a buggy library

If you tell google-api-nodejs-client to upload a stream, the library might pipe your stream into the oauth2 token refresh request instead of the actual API request: https://github.com/google/google-api-nodejs-client/issues/260. It will then try to pipe the already-ended stream into the API request, which will hang forever.

You can "patch" the library by subclassing the OAuth2 implementation:

const google = require('googleapis');
const OAuth2 = google.auth.OAuth2;

class FixedOAuth2 extends OAuth2 {
	// Work around https://github.com/google/google-api-nodejs-client/issues/260
	// by patching getRequestMetadata with something that never returns an
	// auth request to googleapis/lib/apirequest.js:createAPIRequest
	//
	// If we don't patch this, the buggy googleapis/google-auth-library interaction
	// will hang our program forever when we try to upload a file when our access token
	// is expired.  (createAPIRequest decides to pipe the stream into the auth request
	// instead of the subsequent request.)
	//
	// We could always refresh the access token ourselves, but we prefer to also
	// patch the buggy code to prevent bugs from compounding.
	getRequestMetadata(optUri, metadataCb) {
		const thisCreds = this.credentials;

		if(!thisCreds.access_token && !thisCreds.refresh_token) {
			return metadataCb(new Error('No access or refresh token is set.'), null);
		}

		// if no expiry time, assume it's not expired
		const expiryDate = thisCreds.expiry_date;
		const isTokenExpired = expiryDate ? expiryDate <= (new Date()).getTime() : false;

		if(thisCreds.access_token && !isTokenExpired) {
			thisCreds.token_type = thisCreds.token_type || 'Bearer';
			const headers = {'Authorization': thisCreds.token_type + ' ' + thisCreds.access_token};
			return metadataCb(null, headers, null);
		} else {
			return metadataCb(new Error('Access token is expired.'), null);
		}
	}
}

Note that you'll still need to make the calls to refresh the access token yourself.

You can use it with something like:

	const oauth2Client = new FixedOAuth2(clientId, clientSecret, redirectUrl);
	const drive = google.drive({version: 'v2', auth: oauth2Client});

Other tips

streamA.pipe(streamB) is broken because it doesn't forward errors (bug), which is not completely obvious in the documentation. You really want to forward errors. Use a pipeWithErrors function instead of stream.pipe:

function pipeWithErrors(src, dest) {
	src.pipe(dest);
	src.once('error', function(err) {
		dest.emit('error', err);
	});
}

pipeWithErrors(streamA, streamB);
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].