# Using Kafka in Nodejs
# Installing Kafka
Before we can use Kafka in our Nodejs application, we need to install Kafka on our system. You can download Kafka from the official website: https://kafka.apache.org/downloads
Once you have downloaded Kafka, extract the files to a directory of your choice. You can then start Kafka by running the following command:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
# Basic
# Creating a Kafka Producer
const kafka = require('kafka-node');
const Producer = kafka.Producer;
const client = new kafka.KafkaClient();
const producer = new Producer(client);
producer.on('ready', function () {
console.log('Kafka producer is ready');
});
producer.on('error', function (err) {
console.error('Error occurred while creating Kafka producer:', err);
});
const payloads = [
{ topic: 'test-topic', messages: 'Hello World!' }
];
producer.send(payloads, function (err, data) {
if (err) {
console.error('Error occurred while sending message:', err);
} else {
console.log('Message sent:', data);
}
});
# Creating a Kafka Consumer
const kafka = require('kafka-node');
const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient();
const consumer = new Consumer(
client,
[{ topic: 'test-topic', partition: 0 }],
{ autoCommit: false }
);
consumer.on('message', function (message) {
console.log('Received message:', message);
});
consumer.on('error', function (err) {
console.error('Error occurred while creating Kafka consumer:', err);
});
# Partition
# Setup
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic
# Producer
const payloads = [
{ topic: 'my-topic', messages: 'Hello World!', partition: 0 },
];
producer.send(payloads, function (err, data) {
console.log(data);
});
# Consumer
const consumer = new Consumer(
client,
[
{ topic: 'my-topic', partition: 0 }
],
{
autoCommit: false
}
);
# Manual ack / commit
const consumer = new Consumer(
client,
[{ topic: 'test-topic', partition: 0 }],
{ autoCommit: false }
);
consumer.on('message', function (message) {
console.log('Received message:', message.value);
// Perform some processing on the message
consumer.commit(function (error, data) {
if (error) {
console.error('Error committing message:', error);
} else {
console.log('Message committed:', data);
}
});
})
# Batch processing
# Producer
const messages = [
{ topic: 'my-topic', messages: ['message 1', 'message 2', 'message 3'] },
{ topic: 'my-other-topic', messages: ['message 4', 'message 5', 'message 6'] }
];
producer.send(messages, function(err, data) {
if (err) {
console.error(err);
} else {
console.log(data);
}
});
# Consumer
using the fetch
method to fetch messages from Kafka in batches.
const consumer = new kafka.Consumer(client, [{ topic: 'my-topic' }]);
consumer.on('message', function(message) {
console.log(message);
});
consumer.on('error', function(err) {
console.error(err);
});
consumer.fetch({ maxWaitTime: 1000, minBytes: 1, maxBytes: 1024 * 1024 }, function(err, messages) {
if (err) {
console.error(err);
} else {
console.log(messages);
}
});
# Dead letter queue (DLQ)
A dead letter queue (DLQ) is a queue where messages that cannot be processed are sent. In Kafka, a DLQ can be implemented using a combination of topics and consumer groups. Here's how to use a DLQ in Kafka:
# Create topic
# Main topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic-dlq
# DLQ for main topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic-dlq
# Consumer group
Consumer group 4 main topic
var kafka = require('kafka-node');
var ConsumerGroup = kafka.ConsumerGroup;
var options = {
kafkaHost: 'localhost:9092',
groupId: 'my-group',
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest'
};
var consumerGroup = new ConsumerGroup(options, 'my-topic');
// Set up a listener for the main topic:
consumerGroup.on('message', function (message) {
try {
// process message
} catch (err) {
// send message to DLQ
var producer = new kafka.Producer(client);
producer.on('ready', function () {
producer.send([
{ topic: 'my-topic-dlq', messages: [message.value] }
], function (err, data) {
console.log(data);
});
});
}
});
Consumer group 4 the DLQ
var options = {
kafkaHost: 'localhost:9092',
groupId: 'my-group-dlq',
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest'
};
var consumerGroupDLQ = new ConsumerGroup(options, 'my-topic-dlq');
consumerGroupDLQ.on('message', function (message) {
// process message
});
← Bookmarklet Firebase →