Интеграция с kafka

В этом уроке объясняется, как использовать Корреляторы событий для получения событий из источников данных Kafka.

Зависимости

Для использования Kafka в качестве источников данных, необходимо установить следующие зависимости:

Все расширения должны быть установлены в директорию расширений, указанную в свойствах плагина Корреляторы Событий. Библиотека Kafka-clients должна быть установлена в поддиректорую lib установочной папки SberMobile.

Об источниках данных kafka

В Kafka сообщения создаются продюсерами. Продюсером может быть, например, устройство, отправляющее данные о температуре. Как только сообщение от продюсера достигает кластера Kafka, оно сохраняется и публикуется в топике. Получатели могут подписаться на топики, чтобы получать сохраненные сообщения.

В каждом топике есть одна или более партиций. Партиция действует как отдельный канал внутри топика и имеет идентификатор для доступа к ней. Партиции нумеруются, начиная с 0. Сообщения присваиваются определенной партиции на основании значения поля ключа, хранящегося в сообщении. Например, если несколько устройств включат свои идентификаторы в сообщения, каждому устройству может быть назначена отдельная партиция.

Движок коррелятора выступает в роли потребителя для топиков Kafka. Поток Kafka может подписаться на топик и получать данные из всех его партиций. Когда поток получает события, его можно использовать как обычный поток коррелятора. Например, вы можете отфильтровать входящий поток или обнаружить шаблоны во входящих событиях.

Источники данных kafka

Как и любой другой поток, потоки Kafka используют декораторы для определения параметров потока.

Более подробно о том, как работают потоки коррелятора событий, см. в разделе Скрипты коррелятора событий.

Пример входного потока Kafka:

@Source(

    type="kafka", 

    bootstrap.servers="kafka1.example.com:9091, kafka2.example.com:9092", 

    topic.list="example", group.id="correlator", 

    threading.option="single.thread", 

    @map(type='json'))

 

define stream InKafkaTemperature (device string, temperature int);

 

Параметры потока Kafka @Source:

Параметр

Опциональный

Описание

type

Нет

Тип входного потока. Для Kafka источников значение всегда kafka.

bootstrap.servers

Нет

Список хостов Kafka, разделенных запятой. Движок коррелятора будет получать события от указанных хостов.

topic.list

Нет

Список топиков, разделенных запятой. Поток будет получать события из всех указанных топиков.

group.id

Нет

Имя группы получателей. Если вы хотите разделить сообщения из одного топика между несколькими входящими потоками, используйте то же самое имя группы. Это позволит избежать получения потоками дублей событий.

threading.option

Нет

Определяет способ обработки источника. Возможные значения:

  • single.thread. Источник будет работать на одном потоке.
  • topic.wise. Каждый топик будет работать на отдельном потоке.
  • partition.wise. Для каждой партиции будет использован отдельный поток.

seq.enabled

Да

Опция используется, когда важна последовательность получаемых событий. В этом случае, поток должен определить признак, который будет выступать идентификатором. Параметр необязательный и по умолчанию выставлен на false.

is.binary.message

Да

Если события в бинарном формате, этот параметр должен быть выставлен на true. Параметр необязательный и по умолчанию выставлен на false.

topic.offset.map

Да

Определяет офсеты топиков в формате <topic> = <offset>, <topic> = <offset>. Офсет будет определять, сколько сообщений от начала топика будут пропущены. Например, если значение офсета равно 100, то первые 100 сообщений будут пропущены, когда движок коррелятора будет читать топик. Параметр необязательный, и по умолчанию сообщения не пропускаются.

optional.configuration

Да

Определяет все остальные параметры настроек получателя. Формат parameter:value, parameter:value. Параметр необязательный.

@map

Нет

Маппер входных данных. Расширение Kafka поддерживает входные данные в форматах текс, xml, json, а также в бинарном формате. Этот формат можно указать в параметре маппера. Например, для получения данных в формате json укажите маппер @map(type='json') для входного потока.

Пример

Следующий скрипт получает данные из семи партиций (0-6) топика температура. Поток подключится к двум серверам из кластера Kafka: kafka1.example.com:9091 и kafka2.example.com:9092. Входящий поток имеет два поля: устройство и температура. Для любых событий со значением температуры выше 100, генерируется событие в исходящем потоке.

@Source(

    type="kafka", 

    bootstrap.servers="kafka1.example.com:9091, kafka2.example.com:9092", 

    topic.list="temperature", group.id="correlator", 

    threading.option="single.thread", 

    partition.no.list="0,1,2,3,4,5,6", 

    seq.enabled="false", 

    is.binary.message="false", 

    @map(type='json'))

 

define stream InKafkaTemperature (device string, temperature int);

 

 @Sink(type = 'internalEvent', event='test', @map(type='internalEvent'))

define stream OutKafkaTemperature (device string, temperature int);

 

from InKafkaTemperature[temperature > 100]

select 

    device as message,

    temperature as temperature

insert into OutKafkaTemperature;