Integrate Kafka with Drupal 8
Kafka:
Kafka is publish-subscribe messaging system. It is used in real-time streaming data architecture provide real-time analytics. It is fast, scalable, durable, and fault-tolerant.
Kafka is used for stream processing, website activity tracking, metrics collection and monitoring, log aggregation, real-time analytics, replay messages, error recovery, guaranteed distributed commit log for in-memory computing (microservices) etc.
Requirements:
- Zookeeper and kafka installation.
- "librdkafka" client library.
- "php-rdkafka" PHP extension.
- Drupal version >= 8.2.0
- Drupal contributed module kafka.
Zookeeper
- Installing apache zookeeper: Download zookeeper or wget http://www-us.apache.org/dist/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz in "/opt/" and tar -zxvf.
- create /opt/zookeeper_data directory by command "mkdir /opt/zookeeper_data".
- cd zookeeper-3.4.10.
- cd zookeeper-3.4.10/conf
- rename or cp zoo_sample.cfg zoo.cfg
- Now check for the properties in zoo.cfg file.
- vim conf/zoo.cfg
- Set following in zoo.cfg
- dataDir : set it "/opt/zookeeper_data".
- clientPort : set it 2181.
- ./bin/zkServer.sh start
- To verify status of zookeeper "jps".
- If zookeeper has started successfully then process will appear with process id : "QuorumPeerMain" like 10956 QuorumPeerMain.
Kafka
- To configure apache Kafka : Download kafka or wget http://www-us.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz in "/opt/" and tar -zxvf.
- cd kafka_2.11-1.0.0/config
- Now see file "server.properties" in config directory.
- Edit server.properties with command : vi config/server.properties
- Check for the properties in server.properties
- Borker_id=1 (By default it is 1)
- Port=9092 (If it is not present then by default it is 9092)
- Zookeeper.connect=127.0.0.1:2181(If it is localhost then no need to update)
Now start apache kafka brokers
./bin/kafka-server-start.sh -daemon config/server.properties
Now check running kafka precesses by command "jps".
Create topic
./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic test-topic --partitions 2 --replication-factor 1
Sending message
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
After above commad type your message “Hi, its first test message.” And press enter.
Then press ctrl+z;
Consume message
./bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic --from-beginning
Install rdkafka php extension
To install gcc: sudo apt-get install gcc
To install C++ compiler: sudo apt-get install g++
To install make: sudo apt-get install build-essential
To install phpize: sudo apt install php7.0-dev
Install librdkafka library.
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka/
./configure
sudo make install
sudo pecl install rdkafka
Then go to Drupal root directory.
composer require enqueue/rdkafka
composer require nmred/kafka-php
To install gcc: sudo apt-get install gcc
To install C++ compiler: sudo apt-get install g++
To install make: sudo apt-get install build-essential
To install phpize: sudo apt install php7.0-dev
Install librdkafka library.
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka/
./configure
sudo make install
sudo pecl install rdkafka
Then go to Drupal root directory.
composer require enqueue/rdkafka
composer require nmred/kafka-php
Make entry of "extension=rdkafka.so" in etc/php/7.0/apache2/php.ini AND in etc/php/7.0/cli/php.ini.
Restart Apache.
Restart Apache.
Download and install drupal 8 contributed kafka module.
Create configuration in settings.php
Configure "settings.php" to expose kafka queue.
$settings['queue_default'] = 'queue.kafka';
$settings['queue_service_{queue_name}'] = 'queue.kafka';
$settings['kafka'] = [
'consumer' => [
'brokers' => ['127.0.0.1']
],
'producer' => [
'brokers' => ['127.0.0.1'],
],
];
Now you can check above created topic with drush command: drush kt
Now create custom module and use below code lines.
// Calling the Kafka producer service
$rk = \Drupal::service('kafka.producer');
$rk→setLogLevel(LOG_WARNING);
// Creating a topic.
$topic_name = 'kafka_test_topic';
$topic = $rk→newTopic($topic_name);
// Defining the content of the message (item)
$message = '{
"title": "' . $nodetitle . '",
"type": "' . $contenttype . '",
"properties": {
"node": {
"nid": "' . $nodeid . '"
}
}
}';
if ($topic) {
// Pushing the topic in drupal queue.
$q = \Drupal::queue($topic_name);
//Creating the drupal QUEUE
$q→createQueue();
//Creating the message (item)
$dev = $q->createItem($message);
}
Drupal 8 Custom module for kafka message
custom_kafka
custom_kafka.info.yml
custom_kafka.module
Now custom_kafka.info.yml
name: Custom kafka
description: custom module to integrate kafka with Drupal
package: custom
type: module
core: 8.x
custom_kafka.module
Now call below function to send kafka message on add/update/delete hook of entity.
/**
* Function to set message in queue.
*
* @param string $topic_name
* Topic Name.
* @param mixed $message
* Message.
*/
function kafka_message($topic_name, $message) {
$topic_message = 'Topic_Name:' . $topic_name . ' #Message_Value:';
try {
$q = \Drupal::queue($topic_name);
$q->createQueue();
$q->createItem($message);
$topic_message .= $message;
\Drupal::logger('custom_kafka')->info($topic_message);
}
catch (Exception $ex) {
$topic_message .= $ex->getMessage();
\Drupal::logger('custom_kafka')->error($topic_message);
}
}
Hello Raj,
ReplyDeleteCould you please tell me where and which file we need to write below code.
// Calling the Kafka producer service
$rk = \Drupal::service('kafka.producer');
$rk→setLogLevel(LOG_WARNING);
// Creating a topic.
$topic_name = 'kafka_test_topic';
$topic = $rk→newTopic($topic_name);
// Defining the content of the message (item)
$message = '{
"title": "' . $nodetitle . '",
"type": "' . $contenttype . '",
"properties": {
"node": {
"nid": "' . $nodeid . '"
}
}
}';
if ($topic) {
// Pushing the topic in drupal queue.
$q = \Drupal::queue($topic_name);
//Creating the drupal QUEUE
$q→createQueue();
//Creating the message (item)
$dev = $q->createItem($message);
}
Thank You!!!!