Данный метод интеграции с Kafka устарел. Для всех новых интеграций используйте драйвер Kafka. Если вы решили использовать Корреляторы событий для интеграции с Kafka, обратите внимание, что каждое расширение и зависимость, перечисленные ниже, могут иметь дополнительные подзависимости, которые также должны быть установлены. |
Интеграция с Kafka
В данном уроке объясняется, как использовать Корреляторы событий для получения событий из источников Kafka.
Зависимости
Для источников Kafka требуется установка следующих зависимостей.
Расширения должны быть установлены в директории расширений, указанной в свойствах плагина Корреляторы событий:
расширение io-kafka (Версии: 5.x и выше с групповым идентификатором
io.siddhi.extension.*
)расширение siddhi-map-json (Версии 5.x и выше с групповым идентификатором
io.siddhi.extension.*
)
Библиотека Kafka-clients и зависимости расширения должны быть установлены в папку <папка установки>\lib
в подкаталог папки установки SberMobile:
библиотека kafka-clients
WSO2 Carbon Siddhi Metrics Core (v3.0.57)
Jackson fasterxml:
Информация о загрузке и установке расширений Siddhi может быть полезна, если у вас возникли проблемы с правильной установкой необходимых зависимостей. |
Об источниках данных Kafka
В Kafka сообщения создаются продюсерами. Например, продюсером может быть устройство, которое отправляет данные о температуре. Когда сообщение от производителя достигает кластера Kafka, оно сохраняется и публикуется в топике. Получатели могут подписаться на эти топики, чтобы получать сохраненные сообщения.
Каждая тема имеет один или несколько партиций. Партиция - это отдельный канал внутри топика, доступ к которому осуществляется по его идентификатору. Партиции нумеруются, начиная с 0. Сообщения назначаются конкретному разделу на основе значения ключевого поля, хранящегося в сообщении. Например, если несколько устройств включают в сообщения свой идентификатор, каждому из них может быть назначен свой раздел.
Движок коррелятора выступает в роли потребителя топиков Kafka. Поток Kafka может подписаться на топик и получить данные из всех его партиций. Как только поток получает события, вы можете использовать его как обычный поток коррелятора. Например, вы можете фильтровать входной поток или обнаруживать закономерности во входящих событиях.
Источники данных Kafka
Как и любой другой поток, потоки Kafka используют декораторы для определения параметров потока. Более подробную информацию о том, как работают потоки коррелятора событий, можно найти в разделе Скрипты коррелятора событий.
Пример потока источников Kafka:
@Source(
type="kafka",
bootstrap.servers="kafka1.example.com:9091, kafka2.example.com:9092",
topic.list="example", group.id="correlator",
threading.option="single.thread",
@map(type='json'))
define stream InKafkaTemperature (device string, temperature int);
Параметры потока Kafka @Source:
Параметр | Опция | Описание |
type | Нет | Тип входного потока. Для источников данных Kafka значение всегда kafka. |
bootstrap.servers | Нет | Список хостов Kafka, разделенных запятой. Движок коррелятора будет получать события с указанных хостов. |
topic.list | Нет | Список топиков, разделенных запятыми. Поток будет получать события от всех указанных топиков. |
Нет | Имя группы получателей. Если вы хотите разделить сообщения из одного топика между несколькими входящими потоками, используйте то же самое имя группы. Это позволит избежать получения потоками дублей событий. | |
threading.option | Нет | Определяет способ обработки этого источника. Возможные значения:
|
seq.enabled | Да | Опция используется, когда важна последовательность получаемых событий. В этом случае, поток должен определить признак, который будет выступать идентификатором. Параметр необязательный и по умолчанию выставлен на |
is.binary.message | Да | Если события имеют двоичный формат, этот параметр должен быть установлен в true. Текущий параметр необязателен и по умолчанию равен false. |
topic.offset.map | Да | Определяет смещения топиков в формате |
optional.configuration | Да | Определяет все остальные параметры настроек получателя. Формат |
@map | Нет | Маппер входных данных. Расширение Kafka поддерживает входные данные в форматах текс, xml, json, а также в бинарном формате. Этот формат можно указать в параметре маппера. Например, для получения данных в формате json укажите маппер |
Пример
Следующий скрипт получит данные из семи разделов (0-6) из топика "Температура". Поток подключается к двум серверам из кластера Kafka: kafka1.example.com:9091 и kafka2.example.com:9092. Входящий поток имеет два поля: устройство и температура. Для всех событий, имеющих значение температуры более 100, в выходном потоке генерируется событие.
@Source(
type="kafka",
bootstrap.servers="kafka1.example.com:9091, kafka2.example.com:9092",
topic.list="temperature", group.id="correlator",
threading.option="single.thread",
partition.no.list="0,1,2,3,4,5,6",
seq.enabled="false",
is.binary.message="false",
@map(type='json'))
define stream InKafkaTemperature (device string, temperature int);
@Sink(type = 'internalEvent', event='test', @map(type='internalEvent'))
define stream OutKafkaTemperature (message string, temperature int);
from InKafkaTemperature[temperature > 100]
select
device as message,
temperature as temperature
insert into OutKafkaTemperature;