Kafka
Общее описание
Блок Kafka реализует взаимодействие с брокером сообщений Apache Kafka. Блок Kafka может использоваться в качестве:
- блока – отправителя
- блока – получателя.
Настройка блока Kafka
В данном разделе описаны параметры блока Kafka, которые необходимо заполнить при его настройке.
Базовые параметры
| Название в UI | Название атрибута | Описание | Значение по умолчанию | Тип данных |
|---|---|---|---|---|
| Dynamic Endpoint | dynamicEndpoint | Доступен только для блоков-отправителей. Параметр позволяет включить встроенную функциональность динамического определения вызываемоой конечной точки. При включении параметра структура блока меняется таким образом, что в коде вместо определения to проставляется toD, а значение самого вызываемого ресурса может собираться из переменной, например | False | Логическое значение |
| Pattern | pattern | Доступен только для блоков-отправителей. Устанавливает стиль взаимодействия с присоединенной частью потока. Возможные значения:
| Список | |
| Topic | topic | Обязательный параметр. Имя топика. На блоке-потребителе можно использовать запятую для разделения нескольких топиков. Блок-отправитель может посылать сообщения только в один топик. | Строка | |
| Connection | connection | Позволяет выбрать предсозданное соединение или создать новое с помощью визарда Connection Manager. | Список | |
| Additional Properties | additionalProperties | Устанавливает дополнительные свойства блока Kafka как в случае блока-потребителя, так и блока-отправителя. В случае, если они не могут быть установлены непосредственно в конфигурациях Camel, свойства должны иметь префикс additionalProperties. , например: Если свойства заданы в файле application.properties, они должны иметь префикс camel.component.kafka.additional-properties и свойство, заключенное в квадратные скобки, как в этом примере: camel.component.kafka.additional-propertiesdelivery.timeout.ms=15000. | Строка | |
| Brokers | brokers | URL-адрес используемых брокеров Kafka. Формат - host1:port1,host2:port2, и список может быть как подмножеством брокеров, так и виртуальным адресом, указывающим на подмножество брокеров. В документации по Kafka этот параметр известен как bootstrap.servers. | Строка | |
| Client Id | clientId | Идентификатор клиента – это указанная пользователем строка, отправляемая в каждом запросе для отслеживания вызовов. Она должна логически идентифицировать приложение, отправляющее запрос. | Строка | |
| Header Filter Strategy | headerFilterStrategy | Задает пользовательскую стратегию HeaderFilterStrategy для фильтрации заголовка во входящих и исходящих сообщениях. | Строка | |
| Reconnect Backoff Max Ms | reconnectBackoffMaxMs | Максимальное время ожидания в миллисекундах при повторном подключении к брокеру, которому неоднократно не удавалось подключиться. Если значение задано, то время ожидания для каждого хоста будет экспоненциально увеличиваться при каждом последующем сбое соединения, вплоть до этого максимального значения. После расчета увеличения времени ожидания добавляется рандомизированное значение jitter (разница между наибольшей и наименьшей задержкой) в пределах ±20% от изначального, чтобы избежать сбоев в соединении. | 1000 | Целое число |
| Retry Backoff Max Ms | retryBackoffMaxMs | Максимальное время ожидания в миллисекундах при повторной неудачной отправке запроса брокеру, который неоднократно завершался неудачей. Если значение задано, то задержка для каждого клиента будет увеличиваться экспоненциально при каждом неудачном запросе, вплоть до этого максимального значения. Чтобы предотвратить синхронизацию всех клиентов при повторной попытке, к задержке будет применен рандомизированный jitter с коэффициентом 0,2, в результате чего задержка будет находиться в диапазоне от -20% до +20% относительно расчетного значения. Если значение Retry Backoff Ms больше, чем Retry Backoff Max Ms, то Retry Backoff Max Ms будет использоваться для постоянного отсчета с самого начала без какого-либо экспоненциального увеличения. | 1000 | Целое число |
| Retry Backoff Ms | retryBackoffMs | Время ожидания, в течение которого необходимо повторить неудачный запрос к заданной партиции топика. Это позволяет избежать повторной отправки запросов в замкнутом цикле в некоторых сценариях сбоя. Это значение является начальным значением задержки и будет экспоненциально увеличиваться для каждого неудачного запроса, вплоть до значения Retry Backoff Max Ms. | 100 | Целое число |
| Shutdown Timeout | shutdownTimeout | Время ожидания завершения работы потребителя или отправителя и завершения его рабочих потоков в миллисекундах. | 30000 | Целое число |
| Allow Manual Commit | allowManualCommit | Определяет разрешено ли выполнение ручных коммитов с помощью KafkaManualCommit. Если эта опция включена, то экземпляр KafkaManualCommit сохраняется в заголовке сообщения Exchange, что позволяет конечным пользователям получать доступ к этому API и выполнять ручные фиксации смещения с помощью Kafka потребителя. | False | Логическое значение |
| Auto Commit Enable | autoCommitEnable | Если значение равно True, то смещение сообщений, уже полученных получателем, периодически фиксируется в ZooKeeper. Это зафиксированное смещение будет использоваться при сбое процесса в качестве позиции, с которой начнет работу новый получатель. | True | Логическое значение |
| Auto Commit Interval Ms | autoCommitIntervalMs | Периодичность автоматических комитов в миллисекундах. | 5000 | Целое число |
| Auto Offset Reset | autoOffsetReset | Определяет что делать, если в ZooKeeper нет начального смещения или если смещение выходит за пределы диапазона: самое раннее: автоматически сбросить смещение на самое раннее смещение >самое позднее: автоматически сбросить смещение на самое последнее смещение ошибка: выдать исключение получателю. Возможные значения:
| latest | Строка |
| Batching | batching | Задает следует ли использовать пакетную обработку или потоковую передачу. Значение по умолчанию равно False, что означает использование потоковой передачи. В режиме потоковой передачи для каждого обмена в теле сообщения обрабатывается одна запись Kafka. В режиме пакетной обработки множество записей Kafka группируются в виде списка объектов в тексте сообщения. Параметр maxPollRecords используется для определения количества записей, которые необходимо сгруппировать в режиме пакетной обработки. | False | Логическое значение |
| Break On First Error | breakOnFirstError | Определяет, что происходит, когда получатель обрабатывает сообщение и оно завершается ошибкой. Если значение параметра равно False, получатель переходит к следующему сообщению и обрабатывает его. Если значение параметра равно True, получатель завершает работу. Использование NoopCommitManager по умолчанию приведет к тому, что потребитель не зафиксирует смещение, и будет предпринята повторная попытка отправки сообщения. Потребитель должен использовать KafkaManualCommit, чтобы определить наилучший способ обработки сообщения. Используя либо SyncCommitManager, либо AsyncCommitManager, потребитель вернется к смещению сообщения, вызвавшего сбой, а затем повторно попытается обработать это сообщение. Однако это может привести к бесконечной обработке одного и того же сообщения, если оно каждый раз приводит к сбою, например, к ошибочному сообщению. Поэтому рекомендуется для решения воспользоваться, например, обработчиком ошибок. | False | Логическое значение |
| Check Crcs | checkCrcs | Задает автоматическое выполнение проверки CRC32 используемых записей. Это гарантирует, что сообщения не повреждены в процессе передачи по сети или на диске. Эта проверка увеличивает затраты ресурсов, поэтому ее можно отключить в случаях, когда требуется высокая производительность. | True | Логическое значение |
| Commit Timeout Ms | commitTimeoutMs | Максимальное время в миллисекундах, в течение которого код будет ожидать завершения синхронной фиксации. | 5000 | Длинное целое число |
| Consumer Request Timeout Ms | consumerRequestTimeoutMs | Максимальное время, в течение которого клиент будет ожидать ответа на запрос. Если ответ не будет получен до истечения времени ожидания, клиент отправит запрос повторно, если это необходимо, или не выполнит запрос, если количество повторных попыток будет исчерпано. | 30000 | Целое число |
| Consumers Count | consumersCount | Количество потребителей, подключающихся к серверу Kafka. Каждый потребитель запускается в отдельном потоке, который извлекает и обрабатывает входящие данные. | 1 | Целое число |
| Fetch Max Bytes | fetchMaxBytes | Максимальный объем данных, который сервер должен возвращать для запроса данных. Это не абсолютный максимум, если первое сообщение в первом непустой партиции запрашиваемых данных превышает это значение, сообщение все равно будет возвращено, чтобы получатель мог продолжить работу. Максимальный размер сообщения, принимаемый брокером, определяется с помощью message.max.bytes (конфигурация брокера) или max.message.bytes (конфигурация топика). Примечание: Потребитель выполняет несколько выборок параллельно. | 52428800 | Целое число |
| Fetch Min Bytes | fetchMinBytes | Минимальный объем данных, который сервер должен вернуть по запросу. Если данных недостаточно, запрос будет ждать, пока накопится необходимый объем данных, прежде чем ответить. | 1 | Целое число |
| Fetch Wait Max Ms | fetchWaitMaxMs | Максимальное количество времени, которое сервер будет блокировать перед ответом на запрос, если недостаточно данных для немедленного выполнения fetch.min.bytes. | 500 | Целое число |
| Group Id | groupId | Строка, однозначно идентифицирующая группу процессов-потребителей, к которой принадлежит данный потребитель. Задавая один и тот же идентификатор группы, несколько процессов могут указывать на то, что все они являются частью одной и той же группы потребителей. Этот параметр необходим для блоков-потребителей. | Строка | |
| Group Instance Id | groupInstanceId | Уникальный идентификатор экземпляра потребителя, предоставленный конечным пользователем. Разрешены только непустые строки. Если значение задано, потребитель рассматривается как статический участник, что означает, что в группе потребителей в любой момент времени разрешен только один экземпляр с этим идентификатором. Это можно использовать в сочетании с увеличением времени ожидания сеанса, чтобы избежать перебалансировки группы, вызванной временной недоступностью (например, перезапуском процесса). Если параметр не задан, получатель присоединится к группе в качестве динамического участника, что является традиционным поведением. | Строка | |
| Header Deserializer | headerDeserializer | Пользовательский KafkaHeaderDeserializer для десериализации значений заголовков Kafka. | Строка | |
| Heartbeat Interval Ms | heartbeatIntervalMs | Ожидаемое время между передачами сообщений координатору потребителей средствами группового управления Kafka. Частота периодических запросов используется для обеспечения того, чтобы сеанс получателя оставался активным, и для облегчения восстановления баланса, когда новые потребители присоединяются к группе или покидают ее. Значение должно быть установлено ниже, чем Session Timeout Ms , но обычно должно быть установлено значение, не превышающее 1/3 от этого значения. Его можно отрегулировать еще ниже, чтобы контролировать ожидаемое время нормальной балансировки. | 3000 | Целое число |
| Key Deserializer | keyDeserializer | Класс десериализатора для ключа, который реализует интерфейс десериализатора. | org.apache.kafka.common.serialization.StringDeserializer | Строка |
| Max Partition Fetch Bytes | maxPartitionFetchBytes | Максимальный объем данных для каждой партиции, который сервер вернет. Максимальный объем памяти, используемый для запроса, будет #partitions max.partition.fetch.bytes. Этот размер должен быть не меньше максимального размера сообщения, разрешенного сервером, иначе отправитель может отправлять сообщения большего размера, чем может принять получатель. В этом случае получатель может зависнуть при попытке получить большое сообщение в определенной партиции. | 1048576 | Целое число |
| Max Poll Interval Ms | maxPollIntervalMs | Максимальная задержка между вызовами функции poll() при использовании управления группами получателей. Это устанавливает верхнюю границу времени, в течение которого получатель может бездействовать, прежде чем получит дополнительные записи. Если функция poll() не будет вызвана до истечения этого таймаута, то получатель будет считаться завершившим работу, и группа выполнит повторную балансировку, чтобы переназначить партиции другому участнику. | Целое число | |
| Max Poll Records | maxPollRecords | Максимальное количество записей, возвращаемых за один вызов функции poll(). | 500 | Целое число |
| Offset Repository | offsetRepository | Репозиторий смещений, используемый для локального хранения смещений каждой партиции топика. При определении одного из них автоматический commit будет отключен. | Строка | |
| Partition Assignor | partitionAssignor | Имя класса стратегии назначения партиций, которую клиент будет использовать для распределения владения партициями между экземплярами-потребителями при использовании группового управления. | org.apache.kafka.clients.consumer.RangeAssignor | Строка |
| Poll On Error | pollOnError | Определяет что делать, если брокер сообщений Kafka выдал исключение при запросе новых сообщений. По умолчанию будет использоваться значение из конфигурации компонента, если на уровне конечной точки не было настроено явное значение. Возможные значения:
| ERROR_HANDLER | Строка |
| Poll Timeout Ms | pollTimeoutMs | Время ожидания, используемый при запросе KafkaConsumer. | 5000 | Длинное целое число |
| Pre Validate Host And Port | preValidateHostAndPort | Определяет следует ли проверять, что порт хоста-посредника является допустимым и может быть разрешен DNS для известного хоста при запуске этого потребителя. Если проверка завершается неудачно, генерируется исключение, из-за которого срабатывает стратегия Fail Fast. Отключение этого параметра отложит проверку после запуска получателя, и повторное подключение будет продолжено в случае ошибки проверки или разрешения DNS. | True | Логическое значение |
| Seek To | seekTo | Определяет должен ли KafkaConsumer читать с начала или с конца при запуске. Возможные значения:
| Строка | |
| Session Timeout Ms | sessionTimeoutMs | Время ожидания в миллисекундах для обнаружения сбоев при использовании средств группового управления Kafka. | 45000 | Целое число |
| Specific Avro Reader | specificAvroReader | Включает использование специального средства чтения Avro для работы с документацией по реестрам с несколькими схемами с реализацией десериализаторов Avro. Эта опция доступна только извне (не в стандартном Apache Kafka). | False | Логическое значение |
| Topic Is Pattern | topicIsPattern | Определяет является ли топик шаблоном (регулярным выражением). Это можно использовать для подписки на динамическое количество топиков, соответствующих шаблону. | False | Логическое значение |
| Value Deserializer | valueDeserializer | Класс десериализатора для значения, которое реализует интерфейс десериализатора. | org.apache.kafka.common.serialization.StringDeserializer | Строка |
| Batch With Individual Headers | batchWithIndividualHeaders | Если эта функция включена и отдельным элементом пакета является Exchange или сообщение, отправитель сгенерирует для него отдельные значения заголовка Kafka, используя пакетное сообщение для определения значений. Обычное поведение заключается в том, что всегда используются одни и те же значения заголовка (которые определяются родительским Exchange, содержащим Iterable или итератор). | False | Логическое значение |
| Buffer Memory Size | bufferMemorySize | Общее количество байт памяти, которое отправитель может использовать для буферизации записей, ожидающих отправки на сервер. Если записи отправляются быстрее, чем они могут быть доставлены на сервер, отправитель либо блокирует, либо создает исключение на основе предпочтений, указанных в block.on.buffer.full.Этот параметр должен примерно соответствовать общему объему памяти, который будет использоваться отправителем, но не является жестким ограничением, поскольку не вся память, используемая отправителем, используется для буферизации. Некоторая дополнительная память будет использоваться для сжатия (если сжатие включено), а также для поддержки запросов в процессе выполнения. | 33554432 | Целое число |
| Compression Codec | compressionCodec | Задает кодек сжатия для всех данных, сгенерированных данным отправителем. Допустимые значения: none, gzip, snappy, lz4 и zstd. Возможные значения:
| none | Строка |
| Connection Max Idle Ms | connectionMaxIdleMs | Время в миллисекундах, после которого незанятые соединения будут закрыты. | 540000 | Целое число |
| Delivery Timeout Ms | deliveryTimeoutMs | Верхняя граница времени, необходимого для сообщения об успешном завершении или сбое после возврата вызова функции send(). Это ограничивает общее время задержки записи перед отправкой, время ожидания подтверждения от брокера (если оно ожидается) и время, отведенное для повторных сбоев отправки. | 120000 | Целое число |
| Enable Idempotence | enableIdempotence | Если установлено значение True, отправитель гарантирует, что в поток будет записана ровно одна копия каждого сообщения. Если установлено значение False, отправитель повторяет попытку из-за сбоев брокера и т.д., что может привести к записи дубликатов повторного сообщения в поток. Примечание: Для включения идемпотентности требуется, чтобы max.in.flight.requests.per.connection было меньше или равно 5 (при этом порядок сообщений сохранялся бы для любого допустимого значения), количество повторных попыток должно быть больше 0, а подтверждения должны быть "all". Идемпотентность включена по умолчанию, если не заданы конфликтующие параметры. Если заданы конфликтующие параметры, а идемпотентность явно не включена, то идемпотентность отключается. Если явно включена идемпотентность и заданы конфликтующие параметры, генерируется исключение ConfigException. | True | Логическое значение |
| Header Serializer | headerSerializer | Пользовательский KafkaHeaderSerializer для сериализации значений заголовков Kafka. | Строка | |
| Key | key | Ключ записи (или null, если ключ не указан). Если этот параметр был задан, то он имеет приоритет над заголовком KafkaConstants#KEY. | Строка | |
| Key Serializer | keySerializer | Класс сериализатора для ключей (по умолчанию тот же, что и для сообщений, если ничего не указано). | org.apache.kafka.common.serialization.StringSerializer | Строка |
| Linger Ms | lingerMs | Отправитель группирует все записи, поступающие в промежутках между передачами запросов, в один пакетный запрос. Обычно это происходит только при загрузке, когда записи поступают быстрее, чем их можно отправить. Однако в некоторых случаях клиент может захотеть уменьшить количество запросов даже при умеренной нагрузке. Эта настройка позволяет добиться этого путем добавления небольшой искусственной задержки. То есть, вместо того, чтобы немедленно отправлять запись, отправитель будет ждать до заданной задержки, чтобы разрешить отправку других записей, чтобы их можно было объединить в пакет. Это поведение можно рассматривать как аналог алгоритма Nagle в TCP. Этот параметр определяет верхнюю границу задержки для пакетной обработки: как только мы получим количество записей в формате batch.size для партиции, оно будет отправлено немедленно, независимо от этого параметра, однако, если для этой партиции накоплено меньше этого количества байт, мы "задержимся" на указанное время ожидания чтобы появилось больше записей. По умолчанию этот параметр равен 0 (т.е. задержки нет). Например, установка значения linger.ms=5 уменьшит количество отправляемых запросов, но увеличит задержку до 5 мс для записей, отправляемых в отсутствие загрузки. | 0 | Целое число |
| Max Block Ms | maxBlockMs | Определяет, как долго будут блокироваться методы KafkaProducer send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() и abortTransaction(). Для функции send() эта задержка ограничивает общее время ожидания как выборки метаданных, так и выделения буфера (блокировка в предоставляемых пользователем сериализаторах или разделителях не учитывается в этом тайм-ауте). Для partitionsFor() это время ожидания ограничивает время, затрачиваемое на ожидание метаданных, если они недоступны. Методы, связанные с транзакциями, всегда блокируются, но могут истечь, если координатор транзакции не может быть обнаружен или не ответил в течение времени ожидания. | 60000 | Целое число |
| Max In Flight Request | maxInFlightRequest | Максимальное количество неподтвержденных запросов, которые клиент отправит по одному соединению перед блокировкой. Примечание: Если для этого параметра установлено значение, превышающее 1, и имеются неудачные отправки, существует риск переупорядочивания сообщений из-за повторных попыток, если повторные попытки включены. | 5 | Целое число |
| Max Request Size | maxRequestSize | Максимальный размер запроса. Это также, по сути, ограничение на максимальный размер записи. Примечание: У сервера есть собственное ограничение на размер записи, которое может отличаться от значения данного параметра. Этот параметр ограничивает количество пакетов записей, которые отправитель отправляет в одном запросе, чтобы избежать отправки огромных запросов. | 1048576 | Целое число |
| Metadata Max Age Ms | metadataMaxAgeMs | Период времени в миллисекундах, по истечении которого метаданные принудительно обновляются, даже если не было никаких изменений в управлении партициями, которые позволили бы заблаговременно обнаружить любые новые брокеры или партиции. | 300000 | Целое число |
| Metric Reporters | metricReporters | Список классов, которые будут использоваться в качестве отчетов о показателях. Реализация интерфейса MetricReporter позволяет подключать классы, которые будут получать уведомления о создании новых показателей. JmxReporter всегда включен для регистрации статистики JMX. | Строка | |
| Metrics Sample Window Ms | metricsSampleWindowMs | Временной интервал, за который вычисляется выборка показателей. | 30000 | Целое число |
| No Of Metrics Sample | noOfMetricsSample | Количество выборок, поддерживаемых для вычисления показателей. | 2 | Целое число |
| Partitioner | partitioner | Класс Partitioner для разделения сообщений между под-топиками. По умолчанию Partitioner основан на хэше ключа. | Строка | |
| Partitioner Ignore Keys | partitionerIgnoreKeys | Задает следует ли игнорировать ключи сообщений при вычислении партиции. Этот параметр действует только в том случае, если Partitioner не установлен. | False | Логическое значение |
| Partition Key | partitionKey | Партиция, в которую будет отправлена запись (или Null, если партиция не была указана). Если этот параметр был определен, то он имеет приоритет над заголовком KafkaConstants#PARTITION_KEY. | Целое число | |
| Producer Batch Size | producerBatchSize | Отправитель будет пытаться объединить записи в пакет с меньшим количеством запросов всякий раз, когда в одну и ту же партицию отправляется несколько записей. Это повышает производительность как на клиенте, так и на сервере. Эта конфигурация определяет размер пакета по умолчанию в байтах. Не будут предприняты попытки объединить записи, размер которых превышает этот размер. Запросы, отправляемые брокерам, будут содержать несколько пакетов, по одному для каждой партиции с данными, доступными для отправки. Небольшой размер пакета сделает пакетную обработку менее распространенной и может снизить пропускную способность (нулевой размер пакета полностью отключит пакетную обработку). Очень большой размер пакета может привести к более расточительному использованию памяти, поскольку всегда будет выделяться буфер указанного размера пакета в ожидании дополнительных записей. | 16384 | Целое число |
| Queue Buffering Max Messages | queueBufferingMaxMessages | Максимальное количество неотправленных сообщений, которое может быть помещено в очередь отправителя при использовании асинхронного режима, прежде чем отправитель будет заблокирован или данные будут удалены. | 10000 | Целое число |
| Receive Buffer Bytes | receiveBufferBytes | Размер буфера приема TCP (SO_RCVBUF), используемого при чтении данных. | 65536 | Целое число |
| Reconnect Backoff Ms | reconnectBackoffMs | Время ожидания перед попыткой повторного подключения к данному хосту. Это позволяет избежать повторного подключения к хосту в замкнутом цикле. Эта задержка применяется ко всем запросам, отправляемым получателем брокеру. | 50 | Целое число |
| Record Metadata | recordMetadata | Задает будут ли сохраняться мета-данные, которые пришли в подтверждение приема сообщения на стороне потребителя. Например, после отправки сообщения в топик, оттуда приходит подтверждение получения вместе с набором данных. Параметр Record Metadata определяет будут эти данные сохранены или нет. Результаты сохраняются в списке, содержащем метаданные RecordMetadata. Список хранится в заголовке с ключом KafkaConstants#KAFKA_RECORDMETA. | True | Логическое значение |
| Request Required Acks | requestRequiredAcks | Количество подтверждений, которые отправитель должен получить от лидера, прежде чем считать запрос выполненным. Это определяет срок жизни отправляемых записей. Разрешены следующие настройки: acks=0 – отправитель вообще не будет ждать подтверждения от сервера. Запись будет немедленно добавлена в буфер сокета и будет считаться отправленной. В этом случае невозможно гарантировать, что сервер получил запись, и настройка повторной попытки не сработает (поскольку клиент, как правило, не узнает о каких-либо сбоях). Значение смещения, возвращаемое для каждой записи, всегда будет равно -1. acks=1 – лидер внесет запись в свой локальный лог, но ответит, не дожидаясь полного подтверждения от всех последователей. В этом случае, если у лидера произойдет сбой сразу после подтверждения записи, но до того, как последователи ее скопируют, запись будет потеряна. acks=all – лидер будет ждать подтверждения записи полным набором синхронизированных реплик. Это гарантирует, что запись не будет потеряна, пока хотя бы одна синхронизированная реплика остается активной. Это самая надежная из доступных гарантий. Это эквивалентно параметру acks=-1. Примечание: Для включения идемпотентности требуется, чтобы значение этого параметра было all. Если заданы конфликтующие параметры, а идемпотентность явно не включена, то идемпотентность отключается. Возможные значения:
| all | Строка |
| Request Timeout Ms | requestTimeoutMs | Период времени, в течение которого брокер будет ждать, пытаясь выполнить требование Request Required Acks, прежде чем отправить клиенту сообщение об ошибке. | 30000 | Целое число |
| Retries | retries | Установка значения, превышающего ноль, приведет к повторной отправке клиентом любой записи, которую не удалось отправить из-за возможной временной ошибки. Обратите внимание, что эта повторная попытка ничем не отличается от повторной отправки записи клиентом после получения сообщения об ошибке. Запросы на отправку будут завершены ошибкой до того, как будет исчерпано количество повторных попыток, если время ожидания, заданное с помощью Delivery Timeout Ms, истечет раньше, чем произойдет успешное подтверждение. Обычно пользователи предпочитают не устанавливать эту настройку и вместо этого использовать Delivery Timeout Ms для управления поведением при повторных попытках. Для включения идемпотентности требуется, чтобы это значение конфигурации было больше 0. Если заданы конфликтующие параметры и идемпотентность явно не включена, идемпотентность отключается. Разрешение повторных попыток при установке Enable Idempotence в значение False и max.in.flight.requests.per.connection в значение 1 потенциально изменит порядок записей, потому что, если два пакета отправляются в одну партицию, и первый завершается неудачей и повторяется попытка, но второй выполняется успешно, то записи из второго пакета могут появиться первыми. | Целое число | |
| Send Buffer Bytes | sendBufferBytes | Размер буфера записи сокета. | 131072 | Целое число |
| Use Iterator | useIterator | Определяет, следует ли при отправке в Kafka отправлять текст сообщения в виде одной записи или использовать java.util.Iterator для отправки нескольких записей в Kafka (если можно выполнить итерацию текста сообщения). | True | Логическое значение |
| Value Serializer | valueSerializer | Класс сериализатора для сообщений. | org.apache.kafka.common.serialization.StringSerializer | Строка |
| Worker Pool | workerPool | Пользовательский рабочий пул для продолжения потока после того, как сервер Kafka подтвердит сообщение, отправленное ему от KafkaProducer, используя асинхронную неблокирующую обработку. Если используется этот параметр, то вы должны управлять жизненным циклом пула потоков, чтобы закрыть пул, когда он больше не нужен. | Строка | |
| Worker Pool Core Size | workerPoolCoreSize | Количество основных потоков для рабочего пула для продолжения потока после того, как сервер Kafka подтвердит сообщение, отправленное ему от KafkaProducer, используя асинхронную неблокирующую обработку. | 10 | Целое число |
| Worker Pool Max Size | workerPoolMaxSize | Максимальное количество потоков рабочего пула для продолжения потока после того, как сервер Kafka подтвердит сообщение, отправленное ему от KafkaProducer, используя асинхронную неблокирующую обработку. | 20 | Целое число |
| Interceptor Classes | interceptorClasses | Устанавливает перехватчики для отправителя или потребителей. Перехватчики-отправители должны быть классами, реализующими Перехватчики-потребители должны быть классами, реализующими Примечание: Если вы используете Producer interceptor для потребителя, он сгенерирует исключение ClassCastException во время выполнения. | Строка | |
| Schema Registry URL | schemaRegistryURL | URL-адрес используемых серверов реестра схем. Формат - host1:port1,host2:port2. В документации по реестрам нескольких схем он известен как schema.registry.url. | Строка |
Расширенные параметры
| Название в UI | Название атрибута | Описание | Значение по умолчанию | Тип данных |
|---|---|---|---|---|
| Bridge Error Handler | bridgeErrorHandler | Позволяет передавать возникшую ошибку из блока «наверх», т. е. на уровень потока, позволяя самостоятельно сконфигурировать собственные правила обработки ошибки через вспомогательные блоки. Примечание: Это возможно только в том случае, если сторонний компонент позволяет системе получать оповещение о возникновении исключения. Некоторые компоненты обрабатывают это только внутренне, и поэтому применение параметра невозможно. По умолчанию блок будет использовать встроенный обработчик исключений. | False | Логическое значение |
| Exception Handler | exceptionHandler | Пользовательский обработчик исключений. Примечание: Если опция Bridge Error Handler включена, то эта опция не используется. По умолчанию получатель будет обрабатывать исключения, которые будут логироваться на уровне WARN или ERROR и игнорироваться. | Строка | |
| Exchange Pattern | exchangePattern | Устанавливает стиль взаимодействия с присоединенной частью потока. Возможные значения:
| Строка | |
| Isolation Level | isolationLevel | Управляет чтением сообщений, записанных транзакционно. Если установлено значение read_committed, функция consumer.poll() будет возвращать только те транзакционные сообщения, которые были зафиксированы. Если установлено значение read_uncommitted (по умолчанию), функция consumer.poll() будет возвращать все сообщения, даже сообщения о транзакциях, которые были прерваны. Сообщения, не связанные с транзакциями, будут возвращаться безусловно в любом режиме. Сообщения всегда будут возвращаться в порядке смещения. Следовательно, в режиме read_committed функция consumer.poll() будет возвращать сообщения только до последнего стабильного смещения (LSO), которое на единицу меньше смещения первой открытой транзакции. В частности, любые сообщения, появляющиеся после сообщений, относящихся к текущим транзакциям, будут удерживаться до тех пор, пока не будет завершена соответствующая транзакция. В результате получатели, использующие read_committed, не смогут прочитать данные до последнего смещения последнего сообщения, которое было успешно реплицировано (high watermark) при транзакциях в процессе. Кроме того, при задании значения read_committed метод seekToEnd вернет LSO. Возможные значения:
| read_uncommitted | Строка |
| Kafka Manual Commit Factory | kafkaManualCommitFactory | Позволяет указать пользовательскую фабрику для создания пользовательских экземпляров KafkaManualCommit на случай, если при выполнении ручных коммитов требуется специальная логика, отличающаяся от реализации по умолчанию, которая поставляется в готовом виде. | Строка | |
| Kafka Client Factory | kafkaClientFactory | Пользовательский компонент, который создаёт экземпляры клиентов отправителей и получателей – Это позволяет настроить создание экземпляров с логикой, расширяющей возможности клиентов vanilla Kafka. | Строка | |
| Lazy Start Producer | lazyStartProducer | Определяет должен ли отправитель стартовать в отложенном режиме (при получении первого сообщения). Отложенный запуск можно использовать в ситуациях, когда запуск отправителя может завершиться неудачей и привести к сбою при запуске маршрута. Если отложить запуск, то сбой при запуске можно будет обработать во время маршрутизации сообщений с помощью обработчиков ошибок маршрутизации. Обратите внимание, что при обработке первого сообщения создание и старт отправителя могут занять некоторое время и увеличить общее время обработки. | False | Логическое значение |
| Synchronous | synchronous | Определяет, следует ли строго использовать синхронную обработку. | False | Логическое значение |
Параметры безопасности
| Название в UI | Название атрибута | Описание | Значение по умолчанию | Тип данных |
|---|---|---|---|---|
| Kerberos Before Relogin Min Time | kerberosBeforeReloginMinTime | Время ожидания потока входа в систему между попытками обновления. | 60000 | Целое число |
| Kerberos Config Location | kerberosConfigLocation | Расположение конфигурационного файла kerberos. | Строка | |
| Kerberos Init Cmd | kerberosInitCmd | Путь к команде Kerberos kinit. | /usr/bin/kinit | Строка |
| Kerberos Principal To Local Rules | kerberosPrincipalToLocalRules | Список правил для преобразования имен участников в короткие имена (обычно это имена пользователей операционной системы). Правила вычисляются по порядку, и первое правило, которое соответствует имени участника, используется для преобразования его в короткое имя. Все последующие правила в списке игнорируются. По умолчанию основные имена в форме {username}/{hostname}{REALM} сопоставляются с {username}. Несколько значений могут быть разделены запятой. | DEFAULT | Строка |
| Kerberos Renew Jitter | kerberosRenewJitter | Процент jitter (разница между наибольшей и наименьшей задержкой), добавленный ко времени обновления. | 0.05 | Число |
| Kerberos Renew Window Factor | kerberosRenewWindowFactor | Поток входа в систему будет находиться в спящем режиме до тех пор, пока не будет достигнут указанный интервал времени с момента последнего обновления до истечения срока действия пропуска, после чего он попытается продлить действие пропуска. | 0.8 | Число |
| Sasl Jaas Config | saslJaasConfig | Укажите параметр kafka sasl.jaas.config, например: org.apache.kafka.common.security.plain.В модуле plainlogin требуется username=USERNAME password=PASSWORD. | Строка | |
| Sasl Kerberos Service Name | saslKerberosServiceName | Основное имя Kerberos, от имени которого запускается Kafka. Это может быть определено либо в конфигурации JAAS Kafka, либо в конфигурации Kafka. | Строка | |
| Sasl Mechanism | saslMechanism | Механизм Simple Authentication and Security Layer (SASL). Допустимые значения приведены в разделе http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml . | GSSAPI | Строка |
| Security Protocol | securityProtocol | Протокол, используемый для связи с брокерами. Поддерживаются SASL_PLAINTEXT, PLAINTEXT, SASL_SSL и SSL. | PLAINTEXT | Строка |
| Ssl Cipher Suites | sslCipherSuites | Список наборов шифров. Это именованная комбинация алгоритмов аутентификации, шифрования, MAC и обмена ключами, используемая для согласования параметров безопасности сетевого подключения с использованием сетевого протокола TLS или SSL. По умолчанию поддерживаются все доступные наборы шифров. | Строка | |
| Ssl Context Parameters | sslContextParameters | Настройка SSL с использованием объекта Camel SSLContextParameters. Если параметр определен, он применяется перед другими параметрами конечной точки SSL. Примечание: Kafka поддерживает подгрузку keystore только из файла по его полному пути. Поэтому добавьте префикс «file:» перед указанием пути до файла в KeyStoreParameters.resource | Строка | |
| Ssl Enabled Protocols | sslEnabledProtocols | Список протоколов, поддерживающих SSL-соединения. По умолчанию используется TLSv1.2,TLSv1.3 при запуске с Java 11 или новее, TLSv1.2 в противном случае. При использовании значения по умолчанию для Java 11 клиенты и серверы предпочтут TLSv1.3, если оба поддерживают его, и в противном случае вернутся к TLSv1.2 (при условии, что оба поддерживают как минимум TLSv1.2). Это значение по умолчанию должно подходить для большинства случаев. Также смотрите документацию по конфигурации для SSLProtocol. | Строка | |
| Ssl Endpoint Algorithm | sslEndpointAlgorithm | Алгоритм идентификации конечной точки для проверки имени хоста сервера с помощью сертификата сервера. Используйте none или false, чтобы отключить проверку имени хоста сервера. | https | Строка |
| Ssl Keymanager Algorithm | sslKeymanagerAlgorithm | Алгоритм, используемый Key Manager Factory для SSL-соединений. Значение по умолчанию – это алгоритм Key Manager Factory, настроенный для виртуальной машины Java. | SunX509 | Строка |
| Ssl Key Password | sslKeyPassword | Пароль от закрытого ключа в файле хранилища ключей или PEM-ключ, указанный в sslKeystoreKey. Это требуется для клиентов только в том случае, если настроена двусторонняя аутентификация. | Строка | |
| Ssl Keystore Location | sslKeystoreLocation | Расположение файла хранилища ключей. Это необязательное значение для клиента и может использоваться для двусторонней аутентификации клиента. | Строка | |
| Ssl Keystore Password | sslKeystorePassword | Пароль хранилища для файла хранилища ключей. Это необязательно для клиента и требуется только в том случае, если настроено sslKeystoreLocation. Пароль хранилища ключей не поддерживается для формата PEM. | Строка | |
| Ssl Keystore Type | sslKeystoreType | Формат файла хранилища ключей. Это необязательный параметр для клиента. | JKS | Строка |
| Ssl Protocol | sslProtocol | Протокол SSL, используемый для генерации SSLContext. Значение по умолчанию - TLSv1.3 при запуске с Java 11 или новее, в противном случае - TLSv1.2. Это значение должно подходить для большинства случаев использования. Допустимые значения в последних версиях JVM - TLSv1.2 и TLSv1.3. TLS, TLSv1.1, SSL, SSLv2 и SSLv3 могут поддерживаться в более старых JVM, но их использование не рекомендуется из-за известных уязвимостей в системе безопасности. При использовании значения по умолчанию для этой конфигурации и sslEnabledProtocols клиенты перейдут на TLSv1.2, если сервер не поддерживает TLSv1.3. Если для этой конфигурации установлено значение TLSv1.2, клиенты не будут использовать TLSv1.3, даже если это одно из значений в sslEnabledProtocols, а сервер поддерживает только TLSv1.3. | Строка | |
| Ssl Provider | sslProvider | Имя поставщика безопасности, используемого для SSL-подключений. Значение по умолчанию - поставщик безопасности виртуальной машины JVM по умолчанию. | Строка | |
| Ssl Trustmanager Algorithm | sslTrustmanagerAlgorithm | Алгоритм, используемый Trust Manager Factory для SSL-подключений. Значение по умолчанию - это алгоритм Trust Manager Factory, настроенный для виртуальной машины Java. | PKIX | Строка |
| Ssl Truststore Location | sslTruststoreLocation | Расположение файла хранилища доверенных сертификатов. | Строка | |
| Ssl Truststore Password | sslTruststorePassword | Пароль для файла хранилища доверенных сертификатов. Если пароль не задан, настроенный файл хранилища доверенных сертификатов по-прежнему будет использоваться, но проверка целостности будет отключена. Пароль хранилища не поддерживается для формата PEM. | Строка | |
| Ssl Truststore Type | sslTruststoreType | Формат файла хранилища доверенных сертификатов. Значение по умолчанию - JKS. | JKS | Строка |