Name: nsqjs
Owner: npm
Description: NodeJS client library for NSQ
Forked from: dudleycarr/nsqjs
Created: 2017-04-25 20:52:40.0
Updated: 2018-01-22 18:48:31.0
Pushed: 2017-06-28 04:30:47.0
Size: 448
Language: JavaScript
GitHub Committers
User | Most Recent Commit | # Commits |
---|
Other Committers
User | Most Recent Commit | # Commits |
---|
The official NodeJS client for the nsq client protocol. This implementation attempts to be fully compliant and maintain feature parity with the official Go (go-nsq) and Python (pynsq) clients.
The topic and channel arguments are strings and must be specified. The options argument is optional. Below are the parameters that can be specified in the options object.
maximum number of messages to process at once. This value is shared between nsqd connections. It's highly recommended that this value is greater than the number of nsqd connections.
frequency in seconds at which the nsqd will send heartbeats to this Reader.
maximum amount of time (seconds) the Reader will backoff for any single backoff
t.
number of times a given message will be attempted (given to MESSAGE handler) before it will be handed to the DISCARD handler and then automatically finished. 0 means that there is **no limit.** If no DISCARD handler is specified and `maxAttempts > 0`, then the message will be finished automatically when the number of attempts has been exhausted.
default amount of time (seconds) a message requeued should be delayed by before being dispatched by nsqd.
ring or an array of strings representing the host/port pair for nsqd instances.
> For example: `['localhost:4150']`
ring or an array of strings representing the host/port pair of nsqlookupd instaces or the full HTTP/HTTPS URIs of the nsqlookupd instances.
> For example: `['localhost:4161']`, `['http://localhost/lookup']`, `['http://localhost/path/lookup?extra_param=true']`
frequency in seconds for querying lookupd instances.
jitter applied to the start of querying lookupd instances periodically.
TLS if nsqd has TLS support enabled.
ire verification of the TLS cert. This needs to be false if you're using
lf signed cert.
zlib Deflate compression.
zlib Deflate compression level.
Snappy compression.
enticate using the provided auth secret.
size in bytes of the buffer nsqd will use when writing to this client. -1
bles buffering. ```outputBufferSize >= 64```
timeout after which any data that nsqd has buffered will be flushed to this client. Value is in milliseconds. ```outputBufferTimeout >= 1```. A value of ```-1``` disables timeouts.
the server-side message timeout in milliseconds for messages delivered to this client.
ver a percentage of all messages received to this connection. ```1 <=
leRate <= 99```
dentifier used to disambiguate this client.
Reader events are:
Reader.MESSAGE
or message
Reader.DISCARD
or discard
Reader.ERROR
or error
Reader.NSQD_CONNECTED
or nsqd_connected
Reader.NSQD_CLOSED
or nsqd_closed
Reader.MESSAGE
and Reader.DISCARD
both produce Message
objects.
Reader.NSQD_CONNECTED
and Reader.NSQD_CLOSED
events both provide the host
and port of the nsqd to which the event pertains.
These methods are available on a Reader object:
connect()
close()
pause()
unpause()
isPaused()
true
if paused, false
otherwise.The following properties and methods are available on Message objects produced by a Reader instance.
timestamp
attempts
id
hasResponded
body
json()
timeUntilTimeout(hard=false)
: finish()
requeue(delay=null, backoff=true)
The delay is in milliseconds. This is how long nsqd will hold on the message
before attempting it again. The backoff parameter indicates that we should
treat this as an error within this process and we need to backoff to recover.touch()
Allows messages to be sent to an nsqd.
Available Writer options:
TLS if nsqd has TLS support enabled.
ire verification of the TLS cert. This needs to be false if you're using
lf signed cert.
zlib Deflate compression.
zlib Deflate compression level.
Snappy compression.
dentifier used to disambiguate this client.
Writer events are:
Writer.READY
or ready
Writer.CLOSED
or closed
Writer.ERROR
or error
These methods are available on a Writer object:
connect()
close()
publish(topic, msgs, [callback])
topic
is a string. msgs
is either a string, a Buffer
, JSON serializable
object, a list of strings / Buffers
/ JSON serializable objects. callback
takes a single error
argument.Start nsqd and nsqdlookupd
qdLookupd Listens on 4161 for HTTP requests and 4160 for TCP requests
qlookupd &
qd --lookupd-tcp-address=127.0.0.1:4160 &
nsq = require('nsqjs');
reader = new nsq.Reader('sample_topic', 'test_channel', {
okupdHTTPAddresses: '127.0.0.1:4161'
er.connect();
er.on('message', function (msg) {
nsole.log('Received message [%s]: %s', msg.id, msg.body.toString());
g.finish();
= require 'nsqjs'
c = 'sample_topic'
nel = 'test_channel'
ons =
okupdHTTPAddresses: '127.0.0.1:4161'
er = new nsq.Reader topic, channel, options
er.connect()
er.on nsq.Reader.MESSAGE, (msg) ->
nsole.log "Received message [#{msg.id}]: #{msg.body.toString()}"
g.finish()
Publish a message to nsqd to be consumed by the sample client:
rl -d "it really tied the room together" http://localhost:4151/pub?topic=sample_topic
This script simulates a message that takes a long time to process or at least longer than the default message timeout. To ensure that the message doesn't timeout while being processed, touch events are sent to keep it alive.
nsq = require('nsqjs');
reader = new nsq.Reader('sample_topic', 'test_channel', {
okupdHTTPAddresses: '127.0.0.1:4161'
er.connect();
er.on('message', function (msg) {
nsole.log('Received message [%s]', msg.id);
nction touch() {
if (!msg.hasResponded) {
console.log('Touch [%s]', msg.id);
msg.touch();
// Touch the message again a second before the next timeout.
setTimeout(touch, msg.timeUntilTimeout() - 1000);
}
nction finish() {
console.log('Finished message [%s]: %s', msg.id, msg.body.toString());
msg.finish();
nsole.log('Message timeout is %f secs.', msg.timeUntilTimeout() / 1000);
tTimeout(touch, msg.timeUntilTimeout() - 1000);
Finish the message after 2 timeout periods and 1 second.
tTimeout(finish, msg.timeUntilTimeout() * 2 + 1000);
der} = require 'nsqjs'
c = 'sample_topic'
nel = 'test_channel'
ons =
okupdHTTPAddresses: '127.0.0.1:4161'
er = new Reader topic, channel, options
er.connect()
er.on Reader.MESSAGE, (msg) ->
nsole.log "Received message [#{msg.id}]"
uch = ->
unless msg.hasResponded
console.log "Touch [#{msg.id}]"
msg.touch()
# Touch the message again a second before the next timeout.
setTimeout touch, msg.timeUntilTimeout() - 1000
nish = ->
console.log "Finished message [#{msg.id}]: #{msg.body.toString()}"
msg.finish()
nsole.log "Message timeout is #{msg.timeUntilTimeout() / 1000} secs."
tTimeout touch, msg.timeUntilTimeout() - 1000
Finish the message after 2 timeout periods and 1 second.
tTimeout finish, msg.timeUntilTimeout() * 2 + 1000
nsqjs uses debug to log debug output.
To see all nsqjs events:
BUG=nsqjs:* node my_nsqjs_script.js
To see all reader events:
BUG=nsqjs:reader:* node my_nsqjs_script.js
To see a specific reader's events:
BUG=nsqjs:reader:<topic>/<channel>:* node my_nsqjs_script.js
Replace
<topic>
and<channel>
To see all writer events:
BUG=nsqjs:writer:* node my_nsqjs_script.js
The writer sends a single message and then a list of messages.
nsq = require('nsqjs');
w = new nsq.Writer('127.0.0.1', 4150);
nnect();
('ready', function () {
publish('sample_topic', 'it really tied the room together');
publish('sample_topic', [
'Uh, excuse me. Mark it zero. Next frame.',
'Smokey, this is not \'Nam. This is bowling. There are rules.'
;
publish('sample_topic', 'Wu?', function (err) {
if (err) { return console.error(err.message); }
console.log('Message sent successfully');
w.close();
;
('closed', function () {
nsole.log('Writer closed');
ter} = require 'nsqjs'
new Writer '127.0.0.1', 4150
nnect()
Writer.READY, ->
publish 'sample_topic', 'it really tied the room together'
publish 'sample_topic', ['Uh, excuse me. Mark it zero. Next frame.',
'Smokey, this is not \'Nam. This is bowling. There are rules.']
publish 'sample_topic', 'Wu?', (err) ->
console.log 'Message sent successfully' unless err?
w.close()
Writer.CLOSED, ->
nsole.log 'Writer closed'
message.finish
is called after backoff event.maxAttempts
is now by default 0. [ Breaking Change! ]MESSAGE
listener if there's no
DISCARD
listener.debug
[ Breaking Change! ]close
, pause
, and unpause
to Readerfinish
, requeue
, etc after nsqd disconnectReaderRdy
, ConnectionRdy
implementationReader
implementationNSQDConnection
Reader
connect()
now happens on next tick so that it can be called before event
handlers are registered.Message
TOUCH
eventsNSQDConnection
implementationwire
implementationMessage
implementation