Monday 25 March 2019

Integrate Kafka with Drupal 8

Integrate Kafka with Drupal 8

Kafka:

Kafka is publish-subscribe messaging system. It is used in real-time streaming data architecture provide real-time analytics. It is fast, scalable, durable, and fault-tolerant. 

Kafka is used for stream processing, website activity tracking, metrics collection and monitoring, log aggregation, real-time analytics, replay messages, error recovery, guaranteed distributed commit log for in-memory computing (microservices) etc.

Requirements:
  • Zookeeper and kafka installation.
  • "librdkafka" client library.
  • "php-rdkafka" PHP extension.
  • Drupal version >= 8.2.0
  • Drupal contributed module kafka.
Install Zookeeper & Kafka
Zookeeper
  • Installing apache zookeeper: Download zookeeper or wget http://www-us.apache.org/dist/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz in "/opt/" and tar -zxvf.
  • create /opt/zookeeper_data directory by command "mkdir /opt/zookeeper_data".
  • cd zookeeper-3.4.10.
  • cd zookeeper-3.4.10/conf
  • rename or cp zoo_sample.cfg zoo.cfg
  • Now check for the properties in zoo.cfg file.
  • vim conf/zoo.cfg
  • Set following in zoo.cfg
  • dataDir : set it "/opt/zookeeper_data".
  • clientPort : set it 2181.
  • ./bin/zkServer.sh start
  • To verify status of zookeeper "jps".
  • If zookeeper has started successfully then process will appear with process id : "QuorumPeerMain" like 10956 QuorumPeerMain.

Kafka
  • To configure apache Kafka : Download kafka or wget http://www-us.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz  in "/opt/" and tar -zxvf.
  • cd kafka_2.11-1.0.0/config
  • Now see file "server.properties" in config directory.
  • Edit server.properties with command : vi config/server.properties
  • Check for the properties in server.properties
  • Borker_id=1 (By default it is 1)
  • Port=9092 (If it is not present then by default it is 9092)
  • Zookeeper.connect=127.0.0.1:2181(If it is localhost then no need to update)
Now start apache kafka brokers


./bin/kafka-server-start.sh -daemon config/server.properties
Now check running kafka precesses by command "jps".




Create topic
./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic test-topic --partitions 2 --replication-factor 1
 Output will : Created topic "test-topic".

Sending message
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

After above commad type your message “Hi, its first test message.” And press enter.
Then press ctrl+z;

Consume message
./bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic --from-beginning

Install rdkafka php extension
To install gcc: sudo apt-get install gcc
To install C++ compiler: sudo apt-get install g++
To install make: sudo apt-get install build-essential
To install phpize: sudo apt install php7.0-dev

Install librdkafka library.
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka/
./configure
sudo make install
sudo pecl install rdkafka

Then go to Drupal root directory.

composer require enqueue/rdkafka
composer require nmred/kafka-php

Make entry of "extension=rdkafka.so" in etc/php/7.0/apache2/php.ini AND in etc/php/7.0/cli/php.ini.
Restart Apache.

Download and install drupal 8 contributed kafka module.
Create configuration in settings.php
Configure "settings.php" to expose kafka queue.
$settings['queue_default'] = 'queue.kafka';
$settings['queue_service_{queue_name}'] = 'queue.kafka';
$settings['kafka'] = [
  'consumer' => [
'brokers' => ['127.0.0.1']
  ],
  'producer' => [
'brokers' => ['127.0.0.1'],
  ],

];

Now you can check above created topic with drush command: drush kt

Now create custom module and use below code lines.

// Calling the Kafka producer service
$rk = \Drupal::service('kafka.producer');
$rk→setLogLevel(LOG_WARNING);

// Creating a topic.
$topic_name = 'kafka_test_topic';
$topic = $rk→newTopic($topic_name);

// Defining the content of the message (item)
$message = '{
  "title": "' . $nodetitle . '",
  "type": "' . $contenttype . '",
  "properties": {
    "node": {
      "nid": "' . $nodeid . '"
    }
  }
 }';
if ($topic) {
  // Pushing the topic in drupal queue.
  $q = \Drupal::queue($topic_name);

  //Creating the drupal QUEUE
  $q→createQueue();

  //Creating the message (item)
  $dev = $q->createItem($message);
}

Drupal 8 Custom module for kafka message
custom_kafka
custom_kafka.info.yml
custom_kafka.module

Now custom_kafka.info.yml
name: Custom kafka
description: custom module to integrate kafka with Drupal
package: custom
type: module
core: 8.x

custom_kafka.module
Now call below function to send kafka message on add/update/delete hook of entity.
/**
 * Function to set message in queue.
 *
 * @param string $topic_name
 *   Topic Name.
 * @param mixed $message
 *   Message.
 */
function kafka_message($topic_name, $message) {
  $topic_message = 'Topic_Name:' . $topic_name . ' #Message_Value:';
  try {
    $q = \Drupal::queue($topic_name);
    $q->createQueue();
    $q->createItem($message);
    $topic_message .= $message;
    \Drupal::logger('custom_kafka')->info($topic_message);
  }
  catch (Exception $ex) {
    $topic_message .= $ex->getMessage();
    \Drupal::logger('custom_kafka')->error($topic_message);
  }
}


1 comment:

  1. Hello Raj,
    Could you please tell me where and which file we need to write below code.
    // Calling the Kafka producer service
    $rk = \Drupal::service('kafka.producer');
    $rk→setLogLevel(LOG_WARNING);

    // Creating a topic.
    $topic_name = 'kafka_test_topic';
    $topic = $rk→newTopic($topic_name);

    // Defining the content of the message (item)
    $message = '{
    "title": "' . $nodetitle . '",
    "type": "' . $contenttype . '",
    "properties": {
    "node": {
    "nid": "' . $nodeid . '"
    }
    }
    }';
    if ($topic) {
    // Pushing the topic in drupal queue.
    $q = \Drupal::queue($topic_name);

    //Creating the drupal QUEUE
    $q→createQueue();

    //Creating the message (item)
    $dev = $q->createItem($message);
    }

    Thank You!!!!

    ReplyDelete