Task queues are a powerful tool for handling tasks that are time-consuming or need to be performed in a specific order. They allow you to add tasks asynchronously and have them processed in a separate worker process, which can be more efficient and scalable than performing all tasks in the same process.
In this article, we’ll look at a simple example of a task queue implemented using Node.js, ExpressJS, Redis, and the ioredis library. Please note that you need to have Redis in your system or follow the steps in this article to spin up a Redis container.
Main and Worker Process
Our task queue has two main parts: the main process that adds tasks to the queue and a worker that processes tasks from the queue. The server and worker communicate using a Redis database and the ioredis library, which is a popular Redis client for Node.js. When a task is added to the queue, the server sends a notification to the client using the callbackURL provided in the task data. To let them know the task has been added to the queue.
Initialize the project
I’ll be using Yarn as my package manager throughout this project. You can use NPM as well.
Let’s initialize the project with the below command.
yarn init
You’ll be prompted with a text to enter the details related to the project. Once done, Install the dependencies with the below command.
yarn add axios express ioredis concurrently
You’ll also need to have Nodemon as a devDependency, This can be installed with.
yarn add nodemon -D
Here is how my package.json looks like
{
"name": "ioredis",
"version": "1.0.0",
"main": "index.js",
"license": "MIT",
"scripts": {
"start": "concurrently \"yarn start:main\" \"yarn start:worker\"",
"start:main": "node index.js",
"start:worker": "node worker.js",
"dev": "nodemon index.js",
"worker": "nodemon worker.js"
},
"dependencies": {
"axios": "^1.2.1",
"concurrently": "^7.6.0",
"express": "^4.18.2",
"ioredis": "^5.2.4"
},
"devDependencies": {
"nodemon": "^2.0.20"
}
}
Setting up Redis as Docker Container
This project needs Redis as a database for queuing process. We will be using Docker and Docker Compose to quickly spin up the Redis container.
In your project folder, Create a new file called docker-compose.yml
and paste the below content inside it.
version: "3"
services:
redis:
image: redis:latest
ports:
- "6379:6379"
Now, run the below command to spin up the Redis container.
docker-compose up -d
Now, you can verify whether the docker container is running by using the below command.
docker ps
If the Redis is up and running, Let’s move on to the Nodejs part.
Setting up Main and Worker
Create an index.js file and paste the below content inside it. In the main process, we use the express library to create an HTTP server and the express json middleware to parse the request body as JSON.
const express = require("express");
const Redis = require("ioredis");
const axios = require("axios");
const app = express();
app.use(express.json());
// Connect to Redis server
const redis = new Redis();
app.post("/add_task", (req, res) => {
// Generate a unique ID for the task
const id = Date.now().toString();
console.log(`Tasked added to queue with id ${id}`);
// Add a task to the queue
redis.lpush("queue", JSON.stringify({ id, data: req.body }));
axios.post(req.body.callbackUrl, {
message: `Task added to queue with id ${id}`,
});
res.send("Task added to queue");
});
app.listen(3000, () => {
console.log("Express server listening on port 3000");
});
We handle these requests with a route handler for the /add_task
path. When a request is received, we generate a unique ID for the task using the current timestamp and add the task to the queue using the lpush
command of the ioredis library. This command pushes the task data (which we stringify as JSON) to the left side of the “queue” list in Redis.
After adding the task to the queue, we send a notification to the client using the callbackUrl provided in the task data. We use the axios library to make an HTTP POST request to the callbackUrl with a simple message indicating that the task has been added to the queue.
Finally, we send a response back to the client to confirm that the task has been added to the queue.
Worker Process
Now create another file called worker.js and paste the below code.
The worker code is responsible for processing tasks from the queue and sending a notification to the client when the task is complete. It uses the same ioredis library as the server to connect to the Redis server and retrieve tasks from the queue using the rpop
command. This command removes the rightmost element (i.e. the last task added) from the “queue” list and returns it to the worker.
const Redis = require("ioredis");
const axios = require("axios");
const redis = new Redis();
async function processQueue() {
const task = JSON.parse(await redis.rpop("queue"));
if (!task) {
setImmediate(processQueue);
return;
}
try {
console.log(`Processing task: ${task.id}`);
await new Promise((resolve) => setTimeout(resolve, 5000));
} catch (error) {
console.error(`Task failed: ${task.id}`);
// Incase of Failure, Add task back to the queue
await redis.lpush("queue", JSON.stringify(task));
}
console.log(`Task complete: ${task.id}`);
// send a notification to the client callbackUrl
axios.post(task.data.callbackUrl, {
message: `Task completed ${task.id}`,
});
setImmediate(processQueue);
}
processQueue();
If no task is available, the worker simply sets itself up to run again in the future using the setImmediate
function. This allows the worker to continuously check for new tasks without using up too many resources.
If a task is available, the worker processes it by simulating a time-consuming operation using the Promise
constructor and setTimeout
function. In a real-world scenario, this could be replaced with code that performs the actual task, such as sending an email, saving data to a database, or making an HTTP request to another service.
If an error occurs while processing the task, the worker handles it by logging an error message and adding the task back to the queue using the lpush
command. This allows the task to be retried if necessary.
After the task is complete (or if it was successfully retried), the worker sends a notification to the client using the callbackUrl provided in the task data. We again use the axios library to make an HTTP POST request to the callbackUrl with a message indicating that the task has been completed.
Finally, the worker sets itself up to run again in the future using setImmediate
, so it can continue processing tasks from the queue.
Testing the Project
Now if you run the start command, You’ll see something like this.
yarn start
If you start sending POST Request, to http://localhost:3000/add_task
with the below payload. The task will be added to the Redis Queue with the key “queue”. In the Callback URL. Paste the URL you’re getting from https://webhook.site/
{
"task": "Task",
"callbackUrl": "https://webhook.site/URL_GIVEN_BY_THEM"
}
Once you send the Request to the above endpoint. You’ll get this response to the Client you’re sending from.
In the given Callback URL. You’ll get like this.
Once the task has been processed, Again you’ll get a message to the Callback URL.
In case of failure, That task is again added back to the queue and processed.
You can modify the source code as you wish to achieve the functionality you want. This is just an explanation of how this can be done.
Also, you can clone this project from GitHub and start running immediately.
git clone https://github.com/WebCheerz/express-redis-task-queue
Now, Go inside that directory and run the below commands.
To Start Redis Container.
docker-compose up -d
Install the dependencies
yarn install
Now start running the projet
yarn start
In this article, we’ve looked at a simple example of a task queue implemented using Node.js, Redis, and the ioredis library. This solution allows you to add tasks asynchronously and have them processed in a separate worker process, which can be more efficient and scalable than performing all tasks in the same process.
There are many additional considerations to keep in mind when implementing a task queue, such as scalability, performance, reliability, and security. However, the basic principles outlined in this article should give you a good starting point for building your own task queue.