Интеграция с kafka
В этом уроке объясняется, как использовать Корреляторы событий для получения событий из источников данных Kafka.
Зависимости
Для использования Kafka в качестве источников данных, необходимо установить следующие зависимости:
- Плагин Корреляторы Событий
- расширение io-kafka (версия 4.1.2)
- расширение siddhi-map-json
- библиотеку kafka-clients
Все расширения должны быть установлены в директорию расширений, указанную в свойствах плагина Корреляторы Событий. Библиотека Kafka-clients должна быть установлена в поддиректорую lib установочной папки SberMobile.
Об источниках данных kafka
В Kafka сообщения создаются продюсерами. Продюсером может быть, например, устройство, отправляющее данные о температуре. Как только сообщение от продюсера достигает кластера Kafka, оно сохраняется и публикуется в топике. Получатели могут подписаться на топики, чтобы получать сохраненные сообщения.
В каждом топике есть одна или более партиций. Партиция действует как отдельный канал внутри топика и имеет идентификатор для доступа к ней. Партиции нумеруются, начиная с 0. Сообщения присваиваются определенной партиции на основании значения поля ключа, хранящегося в сообщении. Например, если несколько устройств включат свои идентификаторы в сообщения, каждому устройству может быть назначена отдельная партиция.
Движок коррелятора выступает в роли потребителя для топиков Kafka. Поток Kafka может подписаться на топик и получать данные из всех его партиций. Когда поток получает события, его можно использовать как обычный поток коррелятора. Например, вы можете отфильтровать входящий поток или обнаружить шаблоны во входящих событиях.
Источники данных kafka
Как и любой другой поток, потоки Kafka используют декораторы для определения параметров потока.
Более подробно о том, как работают потоки коррелятора событий, см. в разделе Скрипты коррелятора событий. |
Пример входного потока Kafka:
|
Параметры потока Kafka @Source:
Параметр | Опциональный | Описание |
type | Нет | Тип входного потока. Для Kafka источников значение всегда |
bootstrap.servers | Нет | Список хостов Kafka, разделенных запятой. Движок коррелятора будет получать события от указанных хостов. |
topic.list | Нет | Список топиков, разделенных запятой. Поток будет получать события из всех указанных топиков. |
group.id | Нет | Имя группы получателей. Если вы хотите разделить сообщения из одного топика между несколькими входящими потоками, используйте то же самое имя группы. Это позволит избежать получения потоками дублей событий. |
threading.option | Нет | Определяет способ обработки источника. Возможные значения:
|
seq.enabled | Да | Опция используется, когда важна последовательность получаемых событий. В этом случае, поток должен определить признак, который будет выступать идентификатором. Параметр необязательный и по умолчанию выставлен на |
is.binary.message | Да | Если события в бинарном формате, этот параметр должен быть выставлен на |
topic.offset.map | Да | Определяет офсеты топиков в формате |
optional.configuration | Да | Определяет все остальные параметры настроек получателя. Формат |
@map | Нет | Маппер входных данных. Расширение Kafka поддерживает входные данные в форматах текс, xml, json, а также в бинарном формате. Этот формат можно указать в параметре маппера. Например, для получения данных в формате json укажите маппер |
Пример
Следующий скрипт получает данные из семи партиций (0-6) топика температура. Поток подключится к двум серверам из кластера Kafka: kafka1.example.com:9091 и kafka2.example.com:9092. Входящий поток имеет два поля: устройство и температура. Для любых событий со значением температуры выше 100, генерируется событие в исходящем потоке.
|