mirror of
https://github.com/thewesker/bug-em.git
synced 2025-12-23 13:31:05 -05:00
lol
This commit is contained in:
358
node_modules/twit/lib/streaming-api-connection.js
generated
vendored
Normal file
358
node_modules/twit/lib/streaming-api-connection.js
generated
vendored
Normal file
@@ -0,0 +1,358 @@
|
||||
|
||||
var EventEmitter = require('events').EventEmitter;
|
||||
var util = require('util');
|
||||
|
||||
var helpers = require('./helpers')
|
||||
var Parser = require('./parser');
|
||||
var request = require('request');
|
||||
var zlib = require('zlib');
|
||||
|
||||
var STATUS_CODES_TO_ABORT_ON = require('./settings').STATUS_CODES_TO_ABORT_ON
|
||||
|
||||
var StreamingAPIConnection = function (reqOpts, twitOptions) {
|
||||
this.reqOpts = reqOpts
|
||||
this.twitOptions = twitOptions
|
||||
this._twitter_time_minus_local_time_ms = 0
|
||||
EventEmitter.call(this)
|
||||
}
|
||||
|
||||
util.inherits(StreamingAPIConnection, EventEmitter)
|
||||
|
||||
/**
|
||||
* Resets the connection.
|
||||
* - clears request, response, parser
|
||||
* - removes scheduled reconnect handle (if one was scheduled)
|
||||
* - stops the stall abort timeout handle (if one was scheduled)
|
||||
*/
|
||||
StreamingAPIConnection.prototype._resetConnection = function () {
|
||||
if (this.request) {
|
||||
// clear our reference to the `request` instance
|
||||
this.request.removeAllListeners();
|
||||
this.request.destroy();
|
||||
}
|
||||
|
||||
if (this.response) {
|
||||
// clear our reference to the http.IncomingMessage instance
|
||||
this.response.removeAllListeners();
|
||||
this.response.destroy();
|
||||
}
|
||||
|
||||
if (this.parser) {
|
||||
this.parser.removeAllListeners()
|
||||
}
|
||||
|
||||
// ensure a scheduled reconnect does not occur (if one was scheduled)
|
||||
// this can happen if we get a close event before .stop() is called
|
||||
clearTimeout(this._scheduledReconnect)
|
||||
delete this._scheduledReconnect
|
||||
|
||||
// clear our stall abort timeout
|
||||
this._stopStallAbortTimeout()
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets the parameters used in determining the next reconnect time
|
||||
*/
|
||||
StreamingAPIConnection.prototype._resetRetryParams = function () {
|
||||
// delay for next reconnection attempt
|
||||
this._connectInterval = 0
|
||||
// flag indicating whether we used a 0-delay reconnect
|
||||
this._usedFirstReconnect = false
|
||||
}
|
||||
|
||||
StreamingAPIConnection.prototype._startPersistentConnection = function () {
|
||||
var self = this;
|
||||
self._resetConnection();
|
||||
self._setupParser();
|
||||
self._resetStallAbortTimeout();
|
||||
self._setOauthTimestamp();
|
||||
self.request = request.post(this.reqOpts);
|
||||
self.emit('connect', self.request);
|
||||
self.request.on('response', function (response) {
|
||||
self._updateOauthTimestampOffsetFromResponse(response)
|
||||
// reset our reconnection attempt flag so next attempt goes through with 0 delay
|
||||
// if we get a transport-level error
|
||||
self._usedFirstReconnect = false;
|
||||
// start a stall abort timeout handle
|
||||
self._resetStallAbortTimeout();
|
||||
self.response = response
|
||||
if (STATUS_CODES_TO_ABORT_ON.indexOf(self.response.statusCode) !== -1) {
|
||||
// We got a status code telling us we should abort the connection.
|
||||
// Read the body from the response and return an error to the user.
|
||||
var body = '';
|
||||
var compressedBody = '';
|
||||
|
||||
self.response.on('data', function (chunk) {
|
||||
compressedBody += chunk.toString('utf8');
|
||||
})
|
||||
|
||||
var gunzip = zlib.createGunzip();
|
||||
self.response.pipe(gunzip);
|
||||
gunzip.on('data', function (chunk) {
|
||||
body += chunk.toString('utf8')
|
||||
})
|
||||
|
||||
gunzip.on('end', function () {
|
||||
try {
|
||||
body = JSON.parse(body)
|
||||
} catch (jsonDecodeError) {
|
||||
// Twitter may send an HTML body
|
||||
// if non-JSON text was returned, we'll just attach it to the error as-is
|
||||
}
|
||||
// surface the error to the user
|
||||
var error = helpers.makeTwitError('Bad Twitter streaming request: ' + self.response.statusCode)
|
||||
error.statusCode = response ? response.statusCode: null;
|
||||
helpers.attachBodyInfoToError(error, body)
|
||||
self.emit('error', error);
|
||||
// stop the stream explicitly so we don't reconnect
|
||||
self.stop()
|
||||
body = null;
|
||||
});
|
||||
gunzip.on('error', function (err) {
|
||||
// If Twitter sends us back an uncompressed HTTP response, gzip will error out.
|
||||
// Handle this by emitting an error with the uncompressed response body.
|
||||
var errMsg = 'Gzip error: ' + err.message;
|
||||
var twitErr = helpers.makeTwitError(errMsg);
|
||||
twitErr.statusCode = self.response.statusCode;
|
||||
helpers.attachBodyInfoToError(twitErr, compressedBody);
|
||||
self.emit('parser-error', twitErr);
|
||||
});
|
||||
} else if (self.response.statusCode === 420) {
|
||||
// close the connection forcibly so a reconnect is scheduled by `self.onClose()`
|
||||
self._scheduleReconnect();
|
||||
} else {
|
||||
// We got an OK status code - the response should be valid.
|
||||
// Read the body from the response and return to the user.
|
||||
var gunzip = zlib.createGunzip();
|
||||
self.response.pipe(gunzip);
|
||||
|
||||
//pass all response data to parser
|
||||
gunzip.on('data', function (chunk) {
|
||||
self._connectInterval = 0
|
||||
// stop stall timer, and start a new one
|
||||
self._resetStallAbortTimeout();
|
||||
self.parser.parse(chunk.toString('utf8'));
|
||||
});
|
||||
|
||||
gunzip.on('close', self._onClose.bind(self))
|
||||
gunzip.on('error', function (err) {
|
||||
self.emit('error', err);
|
||||
})
|
||||
self.response.on('error', function (err) {
|
||||
// expose response errors on twit instance
|
||||
self.emit('error', err);
|
||||
})
|
||||
|
||||
// connected without an error response from Twitter, emit `connected` event
|
||||
// this must be emitted after all its event handlers are bound
|
||||
// so the reference to `self.response` is not interfered-with by the user until it is emitted
|
||||
self.emit('connected', self.response);
|
||||
}
|
||||
});
|
||||
self.request.on('close', self._onClose.bind(self));
|
||||
self.request.on('error', function (err) { self._scheduleReconnect.bind(self) });
|
||||
return self;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle when the request or response closes.
|
||||
* Schedule a reconnect according to Twitter's reconnect guidelines
|
||||
*
|
||||
*/
|
||||
StreamingAPIConnection.prototype._onClose = function () {
|
||||
var self = this;
|
||||
self._stopStallAbortTimeout();
|
||||
if (self._scheduledReconnect) {
|
||||
// if we already have a reconnect scheduled, don't schedule another one.
|
||||
// this race condition can happen if the http.ClientRequest and http.IncomingMessage both emit `close`
|
||||
return
|
||||
}
|
||||
|
||||
self._scheduleReconnect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Kick off the http request, and persist the connection
|
||||
*
|
||||
*/
|
||||
StreamingAPIConnection.prototype.start = function () {
|
||||
this._resetRetryParams();
|
||||
this._startPersistentConnection();
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort the http request, stop scheduled reconnect (if one was scheduled) and clear state
|
||||
*
|
||||
*/
|
||||
StreamingAPIConnection.prototype.stop = function () {
|
||||
// clear connection variables and timeout handles
|
||||
this._resetConnection();
|
||||
this._resetRetryParams();
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop and restart the stall abort timer (called when new data is received)
|
||||
*
|
||||
* If we go 90s without receiving data from twitter, we abort the request & reconnect.
|
||||
*/
|
||||
StreamingAPIConnection.prototype._resetStallAbortTimeout = function () {
|
||||
var self = this;
|
||||
// stop the previous stall abort timer
|
||||
self._stopStallAbortTimeout();
|
||||
//start a new 90s timeout to trigger a close & reconnect if no data received
|
||||
self._stallAbortTimeout = setTimeout(function () {
|
||||
self._scheduleReconnect()
|
||||
}, 90000);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop stall timeout
|
||||
*
|
||||
*/
|
||||
StreamingAPIConnection.prototype._stopStallAbortTimeout = function () {
|
||||
clearTimeout(this._stallAbortTimeout);
|
||||
// mark the timer as `null` so it is clear via introspection that the timeout is not scheduled
|
||||
delete this._stallAbortTimeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the next time a reconnect should occur (based on the last HTTP response received)
|
||||
* and starts a timeout handle to begin reconnecting after `self._connectInterval` passes.
|
||||
*
|
||||
* @return {Undefined}
|
||||
*/
|
||||
StreamingAPIConnection.prototype._scheduleReconnect = function () {
|
||||
var self = this;
|
||||
if (self.response && self.response.statusCode === 420) {
|
||||
// we are being rate limited
|
||||
// start with a 1 minute wait and double each attempt
|
||||
if (!self._connectInterval) {
|
||||
self._connectInterval = 60000;
|
||||
} else {
|
||||
self._connectInterval *= 2;
|
||||
}
|
||||
} else if (self.response && String(self.response.statusCode).charAt(0) === '5') {
|
||||
// twitter 5xx errors
|
||||
// start with a 5s wait, double each attempt up to 320s
|
||||
if (!self._connectInterval) {
|
||||
self._connectInterval = 5000;
|
||||
} else if (self._connectInterval < 320000) {
|
||||
self._connectInterval *= 2;
|
||||
} else {
|
||||
self._connectInterval = 320000;
|
||||
}
|
||||
} else {
|
||||
// we did not get an HTTP response from our last connection attempt.
|
||||
// DNS/TCP error, or a stall in the stream (and stall timer closed the connection)
|
||||
if (!self._usedFirstReconnect) {
|
||||
// first reconnection attempt on a valid connection should occur immediately
|
||||
self._connectInterval = 0;
|
||||
self._usedFirstReconnect = true;
|
||||
} else if (self._connectInterval < 16000) {
|
||||
// linearly increase delay by 250ms up to 16s
|
||||
self._connectInterval += 250;
|
||||
} else {
|
||||
// cap out reconnect interval at 16s
|
||||
self._connectInterval = 16000;
|
||||
}
|
||||
}
|
||||
|
||||
// schedule the reconnect
|
||||
self._scheduledReconnect = setTimeout(function () {
|
||||
self._startPersistentConnection();
|
||||
}, self._connectInterval);
|
||||
self.emit('reconnect', self.request, self.response, self._connectInterval);
|
||||
}
|
||||
|
||||
StreamingAPIConnection.prototype._setupParser = function () {
|
||||
var self = this
|
||||
self.parser = new Parser()
|
||||
|
||||
// handle twitter objects as they come in - emit the generic `message` event
|
||||
// along with the specific event corresponding to the message
|
||||
self.parser.on('element', function (msg) {
|
||||
self.emit('message', msg)
|
||||
|
||||
if (msg.delete) { self.emit('delete', msg) }
|
||||
else if (msg.disconnect) { self._handleDisconnect(msg) }
|
||||
else if (msg.limit) { self.emit('limit', msg) }
|
||||
else if (msg.scrub_geo) { self.emit('scrub_geo', msg) }
|
||||
else if (msg.warning) { self.emit('warning', msg) }
|
||||
else if (msg.status_withheld) { self.emit('status_withheld', msg) }
|
||||
else if (msg.user_withheld) { self.emit('user_withheld', msg) }
|
||||
else if (msg.friends || msg.friends_str) { self.emit('friends', msg) }
|
||||
else if (msg.direct_message) { self.emit('direct_message', msg) }
|
||||
else if (msg.event) {
|
||||
self.emit('user_event', msg)
|
||||
// reference: https://dev.twitter.com/docs/streaming-apis/messages#User_stream_messages
|
||||
var ev = msg.event
|
||||
|
||||
if (ev === 'blocked') { self.emit('blocked', msg) }
|
||||
else if (ev === 'unblocked') { self.emit('unblocked', msg) }
|
||||
else if (ev === 'favorite') { self.emit('favorite', msg) }
|
||||
else if (ev === 'unfavorite') { self.emit('unfavorite', msg) }
|
||||
else if (ev === 'follow') { self.emit('follow', msg) }
|
||||
else if (ev === 'unfollow') { self.emit('unfollow', msg) }
|
||||
else if (ev === 'user_update') { self.emit('user_update', msg) }
|
||||
else if (ev === 'list_created') { self.emit('list_created', msg) }
|
||||
else if (ev === 'list_destroyed') { self.emit('list_destroyed', msg) }
|
||||
else if (ev === 'list_updated') { self.emit('list_updated', msg) }
|
||||
else if (ev === 'list_member_added') { self.emit('list_member_added', msg) }
|
||||
else if (ev === 'list_member_removed') { self.emit('list_member_removed', msg) }
|
||||
else if (ev === 'list_user_subscribed') { self.emit('list_user_subscribed', msg) }
|
||||
else if (ev === 'list_user_unsubscribed') { self.emit('list_user_unsubscribed', msg) }
|
||||
else if (ev === 'quoted_tweet') { self.emit('quoted_tweet', msg) }
|
||||
else if (ev === 'favorited_retweet') { self.emit('favorited_retweet', msg) }
|
||||
else if (ev === 'retweeted_retweet') { self.emit('retweeted_retweet', msg) }
|
||||
else { self.emit('unknown_user_event', msg) }
|
||||
} else { self.emit('tweet', msg) }
|
||||
})
|
||||
|
||||
self.parser.on('error', function (err) {
|
||||
self.emit('parser-error', err)
|
||||
});
|
||||
self.parser.on('connection-limit-exceeded', function (err) {
|
||||
self.emit('error', err);
|
||||
})
|
||||
}
|
||||
|
||||
StreamingAPIConnection.prototype._handleDisconnect = function (twitterMsg) {
|
||||
this.emit('disconnect', twitterMsg);
|
||||
this.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Call whenever an http request is about to be made to update
|
||||
* our local timestamp (used for Oauth) to be Twitter's server time.
|
||||
*
|
||||
*/
|
||||
StreamingAPIConnection.prototype._setOauthTimestamp = function () {
|
||||
var self = this;
|
||||
if (self.reqOpts.oauth) {
|
||||
var oauth_ts = Date.now() + self._twitter_time_minus_local_time_ms;
|
||||
self.reqOpts.oauth.timestamp = Math.floor(oauth_ts/1000).toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Call whenever an http response is received from Twitter,
|
||||
* to set our local timestamp offset from Twitter's server time.
|
||||
* This is used to set the Oauth timestamp for our next http request
|
||||
* to Twitter (by calling _setOauthTimestamp).
|
||||
*
|
||||
* @param {http.IncomingResponse} resp http response received from Twitter.
|
||||
*/
|
||||
StreamingAPIConnection.prototype._updateOauthTimestampOffsetFromResponse = function (resp) {
|
||||
if (resp && resp.headers && resp.headers.date &&
|
||||
new Date(resp.headers.date).toString() !== 'Invalid Date'
|
||||
) {
|
||||
var twitterTimeMs = new Date(resp.headers.date).getTime()
|
||||
this._twitter_time_minus_local_time_ms = twitterTimeMs - Date.now();
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = StreamingAPIConnection
|
||||
Reference in New Issue
Block a user