From 6f8244c1ab1ecb63356c8b4c8b9a754951a0504d Mon Sep 17 00:00:00 2001 From: Jan Kryl Date: Wed, 28 Sep 2016 10:31:02 +0200 Subject: [PATCH] Fix exception handling in read message path --- lib/index.js | 53 ++++++++++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/lib/index.js b/lib/index.js index 0656d4d..f7df6b7 100644 --- a/lib/index.js +++ b/lib/index.js @@ -623,29 +623,6 @@ Socket.prototype.send = function(msg, flags, cb) { return this; }; -Socket.prototype._emitMessage = function (message) { - if (message.length === 1) { - // hot path - this.emit('message', message[0]); - } else { - this.emit.apply(this, ['message'].concat(message)); - } -} - -Socket.prototype._flushRead = function () { - try { - var message = this._zmq.readv(); // can throw - if (!message) { - return false; - } - // Handle received message immediately to prevent memory leak in driver - this._emitMessage(message) - } catch (error) { - this.emit('error', error); // can throw - } - return true; -}; - Socket.prototype._flushWrite = function () { var batch = this._outgoing.fetch(); if (!batch) { @@ -675,7 +652,35 @@ Socket.prototype._flushReads = function() { this._isFlushingReads = true; - while (this._flushRead()); + var message; + + do { + try { + message = this._zmq.readv(); // can throw + } catch (error) { + this._isFlushingReads = false; + this.emit('error', error); // can throw + return; + } + + if (message) { + try { + // Handle received message immediately to prevent memory leak in driver + if (message.length === 1) { + // hot path + this.emit('message', message[0]); + } else { + this.emit.apply(this, ['message'].concat(message)); + } + } catch (error) { + // There might be additional unprocessed messages so continue reading + // after we throw the exception. + process.nextTick(this._flushReads.bind(this)); + this._isFlushingReads = false; + throw error; + } + } + } while (message); this._isFlushingReads = false;