Simplified SQS Wrapper and Async Worker manager.
Features:
- Simple interface. ✅
- Promise based. ✅
- ES6. ✅
- Optimized async worker. ✅
# Using npm
$ npm install wtsqs --save
# Or using yarn
$ yarn add wtsqs- WTSQS
A simplified sqs wrapper with interface similar to a normal queue data structure.
- WTSQSWorker
WTSQS worker job manager.
WTSQSWorker takes care of asynchronously fetching jobs from sqs while processing other jobs concurrently. It also takes care of deleting a job from the queue after successfully processing the message.
A simplified sqs wrapper with interface similar to a normal queue data structure.
Kind: global class
- WTSQS
- new WTSQS(options)
- .size() ⇒
Promise.<integer> - .enqueueOne(payload, [options], [sqsOptions]) ⇒
Promise - .enqueueMany(payloads, [options], [sqsOptions]) ⇒
Promise - .peekOne([options], [sqsOptions]) ⇒
Promise.<(Message|null)> - .peekMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒
Promise.<Array.<Message>> - .deleteOne(message) ⇒
Promise - .deleteMany(messages) ⇒
Promise - .deleteAll() ⇒
Promise - .popOne([options], [sqsOptions]) ⇒
Promise.<(Message|null)> - .popMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒
Promise.<Array.<Message>>
Constructs WTSQS object.
| Param | Type | Default | Description |
|---|---|---|---|
| options | Object |
Options object. | |
| options.url | String |
SQS queue url. | |
| [options.accessKeyId] | String |
AWS access key id. | |
| [options.secretAccessKey] | String |
AWS secret access key. | |
| [options.region] | String |
us-east-1 |
AWS regions where queue exists. |
| [options.defaultMessageGroupId] | String |
FIFO queues only. Default tag assigned to a message that specifies it belongs to a specific message group. If not provided random uuid is assigned to each message which doesn't guarantee order but allows parallelism. | |
| [options.defaultVisibilityTimeout] | Integer |
60 |
Default duration (in seconds) that the received messages are hidden from subsequent retrieve requests. |
| [options.defaultPollWaitTime] | Integer |
10 |
Default duration (in seconds) for which read calls wait for a message to arrive in the queue before returning. |
| [options.sqsOptions] | Object |
Additional options to extend/override the underlying SQS object creation. |
Example
const { WTSQS } = require('wtsqs')
// The most simple way to construct a WTSQS object
const wtsqs = new WTSQS({
url: '//queue-url',
accessKeyId: 'AWS_ACCESS_KEY_ID',
secretAccessKey: 'AWS_SECRET_ACCESS_KEY'
})Get approximate total number of messages in the queue.
Kind: instance method of WTSQS
Example
const size = await wtsqs.size()
console.log(size) // output: 2Enqueue single payload in the queue.
Kind: instance method of WTSQS
See: SQS#sendMessage
| Param | Type | Default | Description |
|---|---|---|---|
| payload | Object |
JSON serializable object. | |
| [options] | Object |
Options. | |
| [options.messageGroupId] | String |
Message group id to override default id. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS sendMessage request. |
Example
const myObj = { a: 1 }
await wtsqs.enqueueOne(myObj)Enqueue batch of payloads in the queue.
Kind: instance method of WTSQS
See: SQS#sendMessageBatch
| Param | Type | Default | Description |
|---|---|---|---|
| payloads | Array.<Object> |
Array of JSON serializable objects. | |
| [options] | Object |
Options object. | |
| [options.messageGroupId] | String |
Message group id to override default id. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS sendMessageBatch request. |
Example
const myObjList = [{ a: 1 }, { b: 3 }]
await wtsqs.enqueueMany(myObjList)Retrieve single message without deleting it.
Kind: instance method of WTSQS
Returns: Promise.<(Message|null)> - Message object or null if queue is empty.
| Param | Type | Default | Description |
|---|---|---|---|
| [options] | Object |
Options object. | |
| [options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
| [options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessage = await wtsqs.peekOne()
console.log(myMessage)
// output:
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
}Retrieve batch of messages without deleting them.
Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>> - Array of retrieved messages.
See: SQS#receiveMessage
| Param | Type | Default | Description |
|---|---|---|---|
| [maxNumberOfMessages] | Number |
10 |
Maximum number of messages to retrieve. Must be between 1 and 10. |
| [options] | Object |
Options object. | |
| [options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
| [options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessageList = await wtsqs.peekMany(2)
console.log(myMessageList)
// output:
[
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
},
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { b: 3 }
}
]Delete single message from queue.
Kind: instance method of WTSQS
See: SQS#deleteMessage
| Param | Type | Description |
|---|---|---|
| message | Message |
Message to be deleted |
Example
const myMessage = await wtsqs.peekOne()
await wtsqs.deleteOne(myMessage)Delete batch of messages from queue.
Kind: instance method of WTSQS
See: SQS#deleteMessageBatch
| Param | Type | Description |
|---|---|---|
| messages | Array.<Message> |
Messages to be deleted |
Example
const myMessageList = await wtsqs.peekMany(2)
await wtsqs.deleteMany(myMessageList)Delete ALL messages in the queue.
NOTE: Can only be called once every 60 seconds.
Kind: instance method of WTSQS
See: SQS#purgeQueue
Example
await wtsqs.deleteAll()Retrieve single message and immediately delete it.
Kind: instance method of WTSQS
Returns: Promise.<(Message|null)> - Message object or null if queue is empty.
| Param | Type | Default | Description |
|---|---|---|---|
| [options] | Object |
Options object. | |
| [options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
| [options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessage = await wtsqs.popOne()
// The message no longer exists in queue
console.log(myMessage)
// output:
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
}Retrieve batch of messages and immediately delete them.
Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>> - Array of retrieved messages.
| Param | Type | Default | Description |
|---|---|---|---|
| [maxNumberOfMessages] | Number |
10 |
Maximum number of messages to retrieve. Must be between 1 and 10. |
| [options] | Object |
Options object. | |
| [options.pollWaitTime] | Integer |
Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. | |
| [options.visibilityTimeout] | Integer |
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests. | |
| [sqsOptions] | Object |
{} |
Additional options to extend/override the underlying SQS receiveMessage request. |
Example
const myMessageList = await wtsqs.popMany(2)
// Messages no longer exist in queue
console.log(myMessageList)
// output:
[
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { a: 1 }
},
{
id: 'messageId',
receiptHandle: 'messageReceiptHandle'
md5: 'messageMD5',
body: { b: 3 }
}
]WTSQS worker job manager.
WTSQSWorker takes care of asynchronously fetching jobs from sqs while processing other jobs concurrently. It also takes care of deleting a job from the queue after successfully processing the message.
Kind: global class
- WTSQSWorker
- new WTSQSWorker(options)
- instance
- .run(handler)
- .shutdown() ⇒
Promise
- inner
- ~runHandler ⇒
Promise
- ~runHandler ⇒
Constructs WTSQSWorker object.
| Param | Type | Default | Description |
|---|---|---|---|
| options | Object |
Options object. | |
| options.wtsqs | WTSQS |
WTSQS instance to use for connecting to sqs. | |
| [options.maxConcurrency] | Integer |
20 |
Maximum number of concurrent jobs. |
| [options.pollWaitTime] | Integer |
5 |
Duration (in seconds) for which read calls wait for a job to arrive in the queue before returning. |
| [options.visibilityTimeout] | Integer |
30 |
Duration (in seconds) that the received jobs are hidden from subsequent retrieve requests. |
| [options.logger] | Object | String |
|
Object with debug, info, warn, error methods to use for logging. Or a string with log level to use default internal logger. |
Example
const { WTSQS, WTSQSWorker } = require('wtsqs')
const wtsqs = new WTSQS({
url: '//queue-url',
accessKeyId: 'AWS_ACCESS_KEY_ID',
secretAccessKey: 'AWS_SECRET_ACCESS_KEY'
})
const worker = new WTSQSWorker({ wtsqs })
worker.run(async (job) => {
await someAsyncFunction(job.body)
console.log(job)
})Start fetching and processing jobs.
Kind: instance method of WTSQSWorker
| Param | Type | Description |
|---|---|---|
| handler | runHandler |
Async function to process a single job. |
Shutsdown the worker and drain active jobs.
Kind: instance method of WTSQSWorker
Returns: Promise - Resolves when all active jobs have been drained.
Async callback function to process single job.
Kind: inner typedef of WTSQSWorker
| Param | Type | Description |
|---|---|---|
| job | Job |
A single job to process |
Received SQS Message
Kind: global typedef
Properties
| Name | Type | Description |
|---|---|---|
| id | String |
Message id. |
| receiptHandle | String |
Message receipt handle. |
| md5 | String |
Message body md5 hash sum. |
| body | Object |
Message body containing original payload. |
Worker Job
Kind: global typedef
Properties
| Name | Type | Description |
|---|---|---|
| id | String |
Job id. |
| receiptHandle | String |
Job receipt handle. |
| md5 | String |
Job body md5 hash sum. |
| body | Object |
Job body containing original payload. |