Перейти к основному содержимому
Руководство администратора
How To статьи
Установка и настройка
Компоненты
Руководство пользователя
Начало работы

Kafka

Общее описание

Блок Kafka реализует взаимодействие с брокером сообщений Apache Kafka. Блок Kafka может использоваться в качестве:

  • блока – отправителя
  • блока – получателя.

Настройка блока Kafka

В данном разделе описаны параметры блока Kafka, которые необходимо заполнить при его настройке.

Базовые параметры

Название в UIНазвание атрибутаОписаниеЗначение по умолчаниюТип данных
Dynamic EndpointdynamicEndpoint

Доступен только для блоков-отправителей.

Параметр позволяет включить встроенную функциональность динамического определения вызываемоой конечной точки. При включении параметра структура блока меняется таким образом, что в коде вместо определения to проставляется toD, а значение самого вызываемого ресурса может собираться из переменной, например ${header.foo}.

FalseЛогическое значение
Patternpattern

Доступен только для блоков-отправителей.

Устанавливает стиль взаимодействия с присоединенной частью потока.

Возможные значения:

  • InOnly — отправить без подтверждения приемки. При выборе в отправленном yaml и в режиме просмотра кода будет виден параметр pattern на том же уровне, что и uri.

  • InОut — ждать подтверждения после отправки.

Список
Topictopic

Обязательный параметр.

Имя топика. На блоке-потребителе можно использовать запятую для разделения нескольких топиков. Блок-отправитель может посылать сообщения только в один топик.

Строка
ConnectionconnectionПозволяет выбрать предсозданное соединение или создать новое с помощью визарда Connection Manager.Список
Additional PropertiesadditionalProperties

Устанавливает дополнительные свойства блока Kafka как в случае блока-потребителя, так и блока-отправителя.

В случае, если они не могут быть установлены непосредственно в конфигурациях Camel, свойства должны иметь префикс additionalProperties. , например: additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro

Если свойства заданы в файле application.properties, они должны иметь префикс camel.component.kafka.additional-properties и свойство, заключенное в квадратные скобки, как в этом примере: camel.component.kafka.additional-propertiesdelivery.timeout.ms=15000.

Строка
BrokersbrokersURL-адрес используемых брокеров Kafka. Формат - host1:port1,host2:port2, и список может быть как подмножеством брокеров, так и виртуальным адресом, указывающим на подмножество брокеров. В документации по Kafka этот параметр известен как bootstrap.servers.Строка
Client IdclientIdИдентификатор клиента – это указанная пользователем строка, отправляемая в каждом запросе для отслеживания вызовов. Она должна логически идентифицировать приложение, отправляющее запрос.Строка
Header Filter StrategyheaderFilterStrategyЗадает пользовательскую стратегию HeaderFilterStrategy для фильтрации заголовка во входящих и исходящих сообщениях. Строка
Reconnect Backoff Max MsreconnectBackoffMaxMsМаксимальное время ожидания в миллисекундах при повторном подключении к брокеру, которому неоднократно не удавалось подключиться. Если значение задано, то время ожидания для каждого хоста будет экспоненциально увеличиваться при каждом последующем сбое соединения, вплоть до этого максимального значения. После расчета увеличения времени ожидания добавляется рандомизированное значение jitter (разница между наибольшей и наименьшей задержкой) в пределах ±20% от изначального, чтобы избежать сбоев в соединении.1000Целое число
Retry Backoff Max Ms retryBackoffMaxMs

Максимальное время ожидания в миллисекундах при повторной неудачной отправке запроса брокеру, который неоднократно завершался неудачей. Если значение задано, то задержка для каждого клиента будет увеличиваться экспоненциально при каждом неудачном запросе, вплоть до этого максимального значения. Чтобы предотвратить синхронизацию всех клиентов при повторной попытке, к задержке будет применен рандомизированный jitter с коэффициентом 0,2, в результате чего задержка будет находиться в диапазоне от -20% до +20% относительно расчетного значения.

Если значение Retry Backoff Ms больше, чем Retry Backoff Max Ms, то Retry Backoff Max Ms будет использоваться для постоянного отсчета с самого начала без какого-либо экспоненциального увеличения.

1000Целое число
Retry Backoff MsretryBackoffMsВремя ожидания, в течение которого необходимо повторить неудачный запрос к заданной партиции топика. Это позволяет избежать повторной отправки запросов в замкнутом цикле в некоторых сценариях сбоя. Это значение является начальным значением задержки и будет экспоненциально увеличиваться для каждого неудачного запроса, вплоть до значения Retry Backoff Max Ms.100Целое число
Shutdown TimeoutshutdownTimeoutВремя ожидания завершения работы потребителя или отправителя и завершения его рабочих потоков в миллисекундах.30000Целое число
Allow Manual CommitallowManualCommitОпределяет разрешено ли выполнение ручных коммитов с помощью KafkaManualCommit. Если эта опция включена, то экземпляр KafkaManualCommit сохраняется в заголовке сообщения Exchange, что позволяет конечным пользователям получать доступ к этому API и выполнять ручные фиксации смещения с помощью Kafka потребителя.FalseЛогическое значение
Auto Commit EnableautoCommitEnableЕсли значение равно True, то смещение сообщений, уже полученных получателем, периодически фиксируется в ZooKeeper. Это зафиксированное смещение будет использоваться при сбое процесса в качестве позиции, с которой начнет работу новый получатель.TrueЛогическое значение
Auto Commit Interval MsautoCommitIntervalMsПериодичность автоматических комитов в миллисекундах.5000Целое число
Auto Offset Reset autoOffsetReset

Определяет что делать, если в ZooKeeper нет начального смещения или если смещение выходит за пределы диапазона:

  • самое раннее: автоматически сбросить смещение на самое раннее смещение

  • >самое позднее: автоматически сбросить смещение на самое последнее смещение

  • ошибка: выдать исключение получателю.

  • Возможные значения:

    • latest

    • earliest

    • none

    latest Строка
    BatchingbatchingЗадает следует ли использовать пакетную обработку или потоковую передачу. Значение по умолчанию равно False, что означает использование потоковой передачи. В режиме потоковой передачи для каждого обмена в теле сообщения обрабатывается одна запись Kafka. В режиме пакетной обработки множество записей Kafka группируются в виде списка объектов в тексте сообщения. Параметр maxPollRecords используется для определения количества записей, которые необходимо сгруппировать в режиме пакетной обработки.FalseЛогическое значение
    Break On First ErrorbreakOnFirstErrorОпределяет, что происходит, когда получатель обрабатывает сообщение и оно завершается ошибкой. Если значение параметра равно False, получатель переходит к следующему сообщению и обрабатывает его. Если значение параметра равно True, получатель завершает работу. Использование NoopCommitManager по умолчанию приведет к тому, что потребитель не зафиксирует смещение, и будет предпринята повторная попытка отправки сообщения. Потребитель должен использовать KafkaManualCommit, чтобы определить наилучший способ обработки сообщения. Используя либо SyncCommitManager, либо AsyncCommitManager, потребитель вернется к смещению сообщения, вызвавшего сбой, а затем повторно попытается обработать это сообщение. Однако это может привести к бесконечной обработке одного и того же сообщения, если оно каждый раз приводит к сбою, например, к ошибочному сообщению. Поэтому рекомендуется для решения воспользоваться, например, обработчиком ошибок.FalseЛогическое значение
    Check CrcscheckCrcsЗадает автоматическое выполнение проверки CRC32 используемых записей. Это гарантирует, что сообщения не повреждены в процессе передачи по сети или на диске. Эта проверка увеличивает затраты ресурсов, поэтому ее можно отключить в случаях, когда требуется высокая производительность.TrueЛогическое значение
    Commit Timeout MscommitTimeoutMsМаксимальное время в миллисекундах, в течение которого код будет ожидать завершения синхронной фиксации.5000Длинное целое число
    Consumer Request Timeout MsconsumerRequestTimeoutMsМаксимальное время, в течение которого клиент будет ожидать ответа на запрос. Если ответ не будет получен до истечения времени ожидания, клиент отправит запрос повторно, если это необходимо, или не выполнит запрос, если количество повторных попыток будет исчерпано.30000Целое число
    Consumers CountconsumersCountКоличество потребителей, подключающихся к серверу Kafka. Каждый потребитель запускается в отдельном потоке, который извлекает и обрабатывает входящие данные.1Целое число
    Fetch Max BytesfetchMaxBytes

    Максимальный объем данных, который сервер должен возвращать для запроса данных. Это не абсолютный максимум, если первое сообщение в первом непустой партиции запрашиваемых данных превышает это значение, сообщение все равно будет возвращено, чтобы получатель мог продолжить работу. Максимальный размер сообщения, принимаемый брокером, определяется с помощью message.max.bytes (конфигурация брокера) или max.message.bytes (конфигурация топика).

    Примечание:

    Потребитель выполняет несколько выборок параллельно.

    52428800Целое число
    Fetch Min BytesfetchMinBytesМинимальный объем данных, который сервер должен вернуть по запросу. Если данных недостаточно, запрос будет ждать, пока накопится необходимый объем данных, прежде чем ответить.1Целое число
    Fetch Wait Max MsfetchWaitMaxMsМаксимальное количество времени, которое сервер будет блокировать перед ответом на запрос, если недостаточно данных для немедленного выполнения fetch.min.bytes.500Целое число
    Group IdgroupIdСтрока, однозначно идентифицирующая группу процессов-потребителей, к которой принадлежит данный потребитель. Задавая один и тот же идентификатор группы, несколько процессов могут указывать на то, что все они являются частью одной и той же группы потребителей. Этот параметр необходим для блоков-потребителей.Строка
    Group Instance IdgroupInstanceIdУникальный идентификатор экземпляра потребителя, предоставленный конечным пользователем. Разрешены только непустые строки. Если значение задано, потребитель рассматривается как статический участник, что означает, что в группе потребителей в любой момент времени разрешен только один экземпляр с этим идентификатором. Это можно использовать в сочетании с увеличением времени ожидания сеанса, чтобы избежать перебалансировки группы, вызванной временной недоступностью (например, перезапуском процесса). Если параметр не задан, получатель присоединится к группе в качестве динамического участника, что является традиционным поведением.Строка
    Header DeserializerheaderDeserializerПользовательский KafkaHeaderDeserializer для десериализации значений заголовков Kafka.Строка
    Heartbeat Interval MsheartbeatIntervalMsОжидаемое время между передачами сообщений координатору потребителей средствами группового управления Kafka. Частота периодических запросов используется для обеспечения того, чтобы сеанс получателя оставался активным, и для облегчения восстановления баланса, когда новые потребители присоединяются к группе или покидают ее. Значение должно быть установлено ниже, чем Session Timeout Ms , но обычно должно быть установлено значение, не превышающее 1/3 от этого значения. Его можно отрегулировать еще ниже, чтобы контролировать ожидаемое время нормальной балансировки.3000Целое число
    Key DeserializerkeyDeserializerКласс десериализатора для ключа, который реализует интерфейс десериализатора.org.apache.kafka.common.serialization.StringDeserializerСтрока
    Max Partition Fetch BytesmaxPartitionFetchBytesМаксимальный объем данных для каждой партиции, который сервер вернет. Максимальный объем памяти, используемый для запроса, будет #partitions max.partition.fetch.bytes. Этот размер должен быть не меньше максимального размера сообщения, разрешенного сервером, иначе отправитель может отправлять сообщения большего размера, чем может принять получатель. В этом случае получатель может зависнуть при попытке получить большое сообщение в определенной партиции.1048576Целое число
    Max Poll Interval MsmaxPollIntervalMsМаксимальная задержка между вызовами функции poll() при использовании управления группами получателей. Это устанавливает верхнюю границу времени, в течение которого получатель может бездействовать, прежде чем получит дополнительные записи. Если функция poll() не будет вызвана до истечения этого таймаута, то получатель будет считаться завершившим работу, и группа выполнит повторную балансировку, чтобы переназначить партиции другому участнику.Целое число
    Max Poll RecordsmaxPollRecordsМаксимальное количество записей, возвращаемых за один вызов функции poll().500Целое число
    Offset RepositoryoffsetRepositoryРепозиторий смещений, используемый для локального хранения смещений каждой партиции топика. При определении одного из них автоматический commit будет отключен.Строка
    Partition AssignorpartitionAssignorИмя класса стратегии назначения партиций, которую клиент будет использовать для распределения владения партициями между экземплярами-потребителями при использовании группового управления.org.apache.kafka.clients.consumer.RangeAssignorСтрока
    Poll On Error pollOnError

    Определяет что делать, если брокер сообщений Kafka выдал исключение при запросе новых сообщений. По умолчанию будет использоваться значение из конфигурации компонента, если на уровне конечной точки не было настроено явное значение.

    Возможные значения:

    • DISCARD – отменит сообщение и запросит следующее,

    • ERROR_HANDLER – будет использовать обработчик ошибок для обработки исключения, а затем продолжит запрос следующего сообщения,

    • RECONNECT – повторно подключит потребителя и повторит попытку запроса сообщения,

    • RETRY – позволит получателю повторно заросить то же сообщение,

    • STOP – остановит получателя. Его необходимо будет запустить / перезапустить вручную, если получателю снова нужно получать сообщения.

    ERROR_HANDLER Строка
    Poll Timeout MspollTimeoutMsВремя ожидания, используемый при запросе KafkaConsumer.5000Длинное целое число
    Pre Validate Host And PortpreValidateHostAndPortОпределяет следует ли проверять, что порт хоста-посредника является допустимым и может быть разрешен DNS для известного хоста при запуске этого потребителя. Если проверка завершается неудачно, генерируется исключение, из-за которого срабатывает стратегия Fail Fast. Отключение этого параметра отложит проверку после запуска получателя, и повторное подключение будет продолжено в случае ошибки проверки или разрешения DNS.TrueЛогическое значение
    Seek ToseekTo

    Определяет должен ли KafkaConsumer читать с начала или с конца при запуске.

    Возможные значения:

    • BEGINNING

    • END

    Строка
    Session Timeout MssessionTimeoutMsВремя ожидания в миллисекундах для обнаружения сбоев при использовании средств группового управления Kafka.45000Целое число
    Specific Avro ReaderspecificAvroReaderВключает использование специального средства чтения Avro для работы с документацией по реестрам с несколькими схемами с реализацией десериализаторов Avro. Эта опция доступна только извне (не в стандартном Apache Kafka).FalseЛогическое значение
    Topic Is PatterntopicIsPatternОпределяет является ли топик шаблоном (регулярным выражением). Это можно использовать для подписки на динамическое количество топиков, соответствующих шаблону.FalseЛогическое значение
    Value DeserializervalueDeserializerКласс десериализатора для значения, которое реализует интерфейс десериализатора.org.apache.kafka.common.serialization.StringDeserializer Строка
    Batch With Individual HeadersbatchWithIndividualHeadersЕсли эта функция включена и отдельным элементом пакета является Exchange или сообщение, отправитель сгенерирует для него отдельные значения заголовка Kafka, используя пакетное сообщение для определения значений. Обычное поведение заключается в том, что всегда используются одни и те же значения заголовка (которые определяются родительским Exchange, содержащим Iterable или итератор).FalseЛогическое значение
    Buffer Memory SizebufferMemorySizeОбщее количество байт памяти, которое отправитель может использовать для буферизации записей, ожидающих отправки на сервер. Если записи отправляются быстрее, чем они могут быть доставлены на сервер, отправитель либо блокирует, либо создает исключение на основе предпочтений, указанных в block.on.buffer.full.Этот параметр должен примерно соответствовать общему объему памяти, который будет использоваться отправителем, но не является жестким ограничением, поскольку не вся память, используемая отправителем, используется для буферизации. Некоторая дополнительная память будет использоваться для сжатия (если сжатие включено), а также для поддержки запросов в процессе выполнения.33554432Целое число
    Compression Codec compressionCodec

    Задает кодек сжатия для всех данных, сгенерированных данным отправителем. Допустимые значения: none, gzip, snappy, lz4 и zstd.

    Возможные значения:

    • none

    • gzip

    • snappy

    • lz4

    • zstd

    none Строка
    Connection Max Idle MsconnectionMaxIdleMsВремя в миллисекундах, после которого незанятые соединения будут закрыты.540000Целое число
    Delivery Timeout MsdeliveryTimeoutMsВерхняя граница времени, необходимого для сообщения об успешном завершении или сбое после возврата вызова функции send(). Это ограничивает общее время задержки записи перед отправкой, время ожидания подтверждения от брокера (если оно ожидается) и время, отведенное для повторных сбоев отправки.120000Целое число
    Enable IdempotenceenableIdempotence

    Если установлено значение True, отправитель гарантирует, что в поток будет записана ровно одна копия каждого сообщения.

    Если установлено значение False, отправитель повторяет попытку из-за сбоев брокера и т.д., что может привести к записи дубликатов повторного сообщения в поток.

    Примечание:

    Для включения идемпотентности требуется, чтобы max.in.flight.requests.per.connection было меньше или равно 5 (при этом порядок сообщений сохранялся бы для любого допустимого значения), количество повторных попыток должно быть больше 0, а подтверждения должны быть "all". Идемпотентность включена по умолчанию, если не заданы конфликтующие параметры. Если заданы конфликтующие параметры, а идемпотентность явно не включена, то идемпотентность отключается. Если явно включена идемпотентность и заданы конфликтующие параметры, генерируется исключение ConfigException.

    TrueЛогическое значение
    Header SerializerheaderSerializerПользовательский KafkaHeaderSerializer для сериализации значений заголовков Kafka.Строка
    KeykeyКлюч записи (или null, если ключ не указан). Если этот параметр был задан, то он имеет приоритет над заголовком KafkaConstants#KEY.Строка
    Key SerializerkeySerializerКласс сериализатора для ключей (по умолчанию тот же, что и для сообщений, если ничего не указано).org.apache.kafka.common.serialization.StringSerializerСтрока
    Linger MslingerMs

    Отправитель группирует все записи, поступающие в промежутках между передачами запросов, в один пакетный запрос. Обычно это происходит только при загрузке, когда записи поступают быстрее, чем их можно отправить. Однако в некоторых случаях клиент может захотеть уменьшить количество запросов даже при умеренной нагрузке. Эта настройка позволяет добиться этого путем добавления небольшой искусственной задержки. То есть, вместо того, чтобы немедленно отправлять запись, отправитель будет ждать до заданной задержки, чтобы разрешить отправку других записей, чтобы их можно было объединить в пакет.

    Это поведение можно рассматривать как аналог алгоритма Nagle в TCP. Этот параметр определяет верхнюю границу задержки для пакетной обработки: как только мы получим количество записей в формате batch.size для партиции, оно будет отправлено немедленно, независимо от этого параметра, однако, если для этой партиции накоплено меньше этого количества байт, мы "задержимся" на указанное время ожидания чтобы появилось больше записей. По умолчанию этот параметр равен 0 (т.е. задержки нет). Например, установка значения linger.ms=5 уменьшит количество отправляемых запросов, но увеличит задержку до 5 мс для записей, отправляемых в отсутствие загрузки.

    0Целое число
    Max Block MsmaxBlockMsОпределяет, как долго будут блокироваться методы KafkaProducer send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() и abortTransaction(). Для функции send() эта задержка ограничивает общее время ожидания как выборки метаданных, так и выделения буфера (блокировка в предоставляемых пользователем сериализаторах или разделителях не учитывается в этом тайм-ауте). Для partitionsFor() это время ожидания ограничивает время, затрачиваемое на ожидание метаданных, если они недоступны. Методы, связанные с транзакциями, всегда блокируются, но могут истечь, если координатор транзакции не может быть обнаружен или не ответил в течение времени ожидания.60000Целое число
    Max In Flight RequestmaxInFlightRequest

    Максимальное количество неподтвержденных запросов, которые клиент отправит по одному соединению перед блокировкой.

    Примечание:

    Если для этого параметра установлено значение, превышающее 1, и имеются неудачные отправки, существует риск переупорядочивания сообщений из-за повторных попыток, если повторные попытки включены.

    5Целое число
    Max Request SizemaxRequestSize

    Максимальный размер запроса. Это также, по сути, ограничение на максимальный размер записи.

    Примечание:

    У сервера есть собственное ограничение на размер записи, которое может отличаться от значения данного параметра. Этот параметр ограничивает количество пакетов записей, которые отправитель отправляет в одном запросе, чтобы избежать отправки огромных запросов.

    1048576Целое число
    Metadata Max Age MsmetadataMaxAgeMsПериод времени в миллисекундах, по истечении которого метаданные принудительно обновляются, даже если не было никаких изменений в управлении партициями, которые позволили бы заблаговременно обнаружить любые новые брокеры или партиции.300000Целое число
    Metric ReportersmetricReportersСписок классов, которые будут использоваться в качестве отчетов о показателях. Реализация интерфейса MetricReporter позволяет подключать классы, которые будут получать уведомления о создании новых показателей. JmxReporter всегда включен для регистрации статистики JMX.Строка
    Metrics Sample Window MsmetricsSampleWindowMsВременной интервал, за который вычисляется выборка показателей.30000Целое число
    No Of Metrics SamplenoOfMetricsSampleКоличество выборок, поддерживаемых для вычисления показателей.2Целое число
    PartitionerpartitionerКласс Partitioner для разделения сообщений между под-топиками. По умолчанию Partitioner основан на хэше ключа.Строка
    Partitioner Ignore KeyspartitionerIgnoreKeysЗадает следует ли игнорировать ключи сообщений при вычислении партиции. Этот параметр действует только в том случае, если Partitioner не установлен.FalseЛогическое значение
    Partition KeypartitionKeyПартиция, в которую будет отправлена запись (или Null, если партиция не была указана). Если этот параметр был определен, то он имеет приоритет над заголовком KafkaConstants#PARTITION_KEY.Целое число
    Producer Batch SizeproducerBatchSizeОтправитель будет пытаться объединить записи в пакет с меньшим количеством запросов всякий раз, когда в одну и ту же партицию отправляется несколько записей. Это повышает производительность как на клиенте, так и на сервере. Эта конфигурация определяет размер пакета по умолчанию в байтах. Не будут предприняты попытки объединить записи, размер которых превышает этот размер. Запросы, отправляемые брокерам, будут содержать несколько пакетов, по одному для каждой партиции с данными, доступными для отправки. Небольшой размер пакета сделает пакетную обработку менее распространенной и может снизить пропускную способность (нулевой размер пакета полностью отключит пакетную обработку). Очень большой размер пакета может привести к более расточительному использованию памяти, поскольку всегда будет выделяться буфер указанного размера пакета в ожидании дополнительных записей.16384Целое число
    Queue Buffering Max MessagesqueueBufferingMaxMessagesМаксимальное количество неотправленных сообщений, которое может быть помещено в очередь отправителя при использовании асинхронного режима, прежде чем отправитель будет заблокирован или данные будут удалены.10000Целое число
    Receive Buffer BytesreceiveBufferBytesРазмер буфера приема TCP (SO_RCVBUF), используемого при чтении данных.65536Целое число
    Reconnect Backoff MsreconnectBackoffMsВремя ожидания перед попыткой повторного подключения к данному хосту. Это позволяет избежать повторного подключения к хосту в замкнутом цикле. Эта задержка применяется ко всем запросам, отправляемым получателем брокеру.50Целое число
    Record MetadatarecordMetadata

    Задает будут ли сохраняться мета-данные, которые пришли в подтверждение приема сообщения на стороне потребителя.

    Например, после отправки сообщения в топик, оттуда приходит подтверждение получения вместе с набором данных. Параметр Record Metadata определяет будут эти данные сохранены или нет.

    Результаты сохраняются в списке, содержащем метаданные RecordMetadata. Список хранится в заголовке с ключом KafkaConstants#KAFKA_RECORDMETA.

    TrueЛогическое значение
    Request Required AcksrequestRequiredAcks

    Количество подтверждений, которые отправитель должен получить от лидера, прежде чем считать запрос выполненным. Это определяет срок жизни отправляемых записей. Разрешены следующие настройки:

    acks=0 – отправитель вообще не будет ждать подтверждения от сервера. Запись будет немедленно добавлена в буфер сокета и будет считаться отправленной. В этом случае невозможно гарантировать, что сервер получил запись, и настройка повторной попытки не сработает (поскольку клиент, как правило, не узнает о каких-либо сбоях). Значение смещения, возвращаемое для каждой записи, всегда будет равно -1.

    acks=1 – лидер внесет запись в свой локальный лог, но ответит, не дожидаясь полного подтверждения от всех последователей. В этом случае, если у лидера произойдет сбой сразу после подтверждения записи, но до того, как последователи ее скопируют, запись будет потеряна.

    acks=all – лидер будет ждать подтверждения записи полным набором синхронизированных реплик. Это гарантирует, что запись не будет потеряна, пока хотя бы одна синхронизированная реплика остается активной. Это самая надежная из доступных гарантий. Это эквивалентно параметру acks=-1.

    Примечание:

    Для включения идемпотентности требуется, чтобы значение этого параметра было all. Если заданы конфликтующие параметры, а идемпотентность явно не включена, то идемпотентность отключается.

    Возможные значения:

    • all

    • - 1

    • 0

    allСтрока
    Request Timeout MsrequestTimeoutMsПериод времени, в течение которого брокер будет ждать, пытаясь выполнить требование Request Required Acks, прежде чем отправить клиенту сообщение об ошибке.30000Целое число
    Retriesretries

    Установка значения, превышающего ноль, приведет к повторной отправке клиентом любой записи, которую не удалось отправить из-за возможной временной ошибки.

    Обратите внимание, что эта повторная попытка ничем не отличается от повторной отправки записи клиентом после получения сообщения об ошибке. Запросы на отправку будут завершены ошибкой до того, как будет исчерпано количество повторных попыток, если время ожидания, заданное с помощью Delivery Timeout Ms, истечет раньше, чем произойдет успешное подтверждение. Обычно пользователи предпочитают не устанавливать эту настройку и вместо этого использовать Delivery Timeout Ms для управления поведением при повторных попытках. Для включения идемпотентности требуется, чтобы это значение конфигурации было больше 0. Если заданы конфликтующие параметры и идемпотентность явно не включена, идемпотентность отключается. Разрешение повторных попыток при установке Enable Idempotence в значение False и max.in.flight.requests.per.connection в значение 1 потенциально изменит порядок записей, потому что, если два пакета отправляются в одну партицию, и первый завершается неудачей и повторяется попытка, но второй выполняется успешно, то записи из второго пакета могут появиться первыми.

    Целое число
    Send Buffer BytessendBufferBytesРазмер буфера записи сокета.131072Целое число
    Use IteratoruseIteratorОпределяет, следует ли при отправке в Kafka отправлять текст сообщения в виде одной записи или использовать java.util.Iterator для отправки нескольких записей в Kafka (если можно выполнить итерацию текста сообщения).TrueЛогическое значение
    Value SerializervalueSerializerКласс сериализатора для сообщений.org.apache.kafka.common.serialization.StringSerializerСтрока
    Worker PoolworkerPoolПользовательский рабочий пул для продолжения потока после того, как сервер Kafka подтвердит сообщение, отправленное ему от KafkaProducer, используя асинхронную неблокирующую обработку. Если используется этот параметр, то вы должны управлять жизненным циклом пула потоков, чтобы закрыть пул, когда он больше не нужен.Строка
    Worker Pool Core SizeworkerPoolCoreSizeКоличество основных потоков для рабочего пула для продолжения потока после того, как сервер Kafka подтвердит сообщение, отправленное ему от KafkaProducer, используя асинхронную неблокирующую обработку.10Целое число
    Worker Pool Max SizeworkerPoolMaxSizeМаксимальное количество потоков рабочего пула для продолжения потока после того, как сервер Kafka подтвердит сообщение, отправленное ему от KafkaProducer, используя асинхронную неблокирующую обработку.20Целое число
    Interceptor ClassesinterceptorClasses

    Устанавливает перехватчики для отправителя или потребителей.

    Перехватчики-отправители должны быть классами, реализующими org.apache.kafka.clients.producer.ProducerInterceptor

    Перехватчики-потребители должны быть классами, реализующими org.apache.kafka.clients.consumer.ConsumerInterceptor

    Примечание:

    Если вы используете Producer interceptor для потребителя, он сгенерирует исключение ClassCastException во время выполнения.

    Строка
    Schema Registry URLschemaRegistryURL

    URL-адрес используемых серверов реестра схем.

    Формат - host1:port1,host2:port2. В документации по реестрам нескольких схем он известен как schema.registry.url.

    Строка

    Расширенные параметры

    Название в UIНазвание атрибутаОписаниеЗначение по умолчаниюТип данных
    Bridge Error HandlerbridgeErrorHandler

    Позволяет передавать возникшую ошибку из блока «наверх», т. е. на уровень потока, позволяя самостоятельно сконфигурировать собственные правила обработки ошибки через вспомогательные блоки.

    Примечание:

    Это возможно только в том случае, если сторонний компонент позволяет системе получать оповещение о возникновении исключения. Некоторые компоненты обрабатывают это только внутренне, и поэтому применение параметра невозможно. По умолчанию блок будет использовать встроенный обработчик исключений.

    FalseЛогическое значение
    Exception HandlerexceptionHandler

    Пользовательский обработчик исключений.

    Примечание:

    Если опция Bridge Error Handler включена, то эта опция не используется. По умолчанию получатель будет обрабатывать исключения, которые будут логироваться на уровне WARN или ERROR и игнорироваться.

    Строка
    Exchange PatternexchangePattern

    Устанавливает стиль взаимодействия с присоединенной частью потока.

    Возможные значения:

    • InOnly — блок стартует поток и не ждет подтверждения от самого потока.

    • InОut — блок стартует поток и ожидает получения ответа от потока, когда тот закончит выполнять свою логику. Пока ответ не будет получен — следующее срабатывание не произойдет.

    Строка
    Isolation LevelisolationLevel

    Управляет чтением сообщений, записанных транзакционно.

    Если установлено значение read_committed, функция consumer.poll() будет возвращать только те транзакционные сообщения, которые были зафиксированы.

    Если установлено значение read_uncommitted (по умолчанию), функция consumer.poll() будет возвращать все сообщения, даже сообщения о транзакциях, которые были прерваны. Сообщения, не связанные с транзакциями, будут возвращаться безусловно в любом режиме. Сообщения всегда будут возвращаться в порядке смещения. Следовательно, в режиме read_committed функция consumer.poll() будет возвращать сообщения только до последнего стабильного смещения (LSO), которое на единицу меньше смещения первой открытой транзакции. В частности, любые сообщения, появляющиеся после сообщений, относящихся к текущим транзакциям, будут удерживаться до тех пор, пока не будет завершена соответствующая транзакция. В результате получатели, использующие read_committed, не смогут прочитать данные до последнего смещения последнего сообщения, которое было успешно реплицировано (high watermark) при транзакциях в процессе. Кроме того, при задании значения read_committed метод seekToEnd вернет LSO.

    Возможные значения:

    • read_uncommitted

    • read_committed

    read_uncommittedСтрока
    Kafka Manual Commit FactorykafkaManualCommitFactoryПозволяет указать пользовательскую фабрику для создания пользовательских экземпляров KafkaManualCommit на случай, если при выполнении ручных коммитов требуется специальная логика, отличающаяся от реализации по умолчанию, которая поставляется в готовом виде.Строка
    Kafka Client FactorykafkaClientFactory

    Пользовательский компонент, который создаёт экземпляры клиентов отправителей и получателей – org.apache.kafka.clients.producer.KafkaProducer и org.apache.kafka.clients.consumer.KafkaConsumer.

    Это позволяет настроить создание экземпляров с логикой, расширяющей возможности клиентов vanilla Kafka.

    Строка
    Lazy Start ProducerlazyStartProducerОпределяет должен ли отправитель стартовать в отложенном режиме (при получении первого сообщения). Отложенный запуск можно использовать в ситуациях, когда запуск отправителя может завершиться неудачей и привести к сбою при запуске маршрута. Если отложить запуск, то сбой при запуске можно будет обработать во время маршрутизации сообщений с помощью обработчиков ошибок маршрутизации. Обратите внимание, что при обработке первого сообщения создание и старт отправителя могут занять некоторое время и увеличить общее время обработки.FalseЛогическое значение
    SynchronoussynchronousОпределяет, следует ли строго использовать синхронную обработку.FalseЛогическое значение

    Параметры безопасности

    Название в UIНазвание атрибутаОписаниеЗначение по умолчаниюТип данных
    Kerberos Before Relogin Min TimekerberosBeforeReloginMinTimeВремя ожидания потока входа в систему между попытками обновления.60000Целое число
    Kerberos Config LocationkerberosConfigLocationРасположение конфигурационного файла kerberos.Строка
    Kerberos Init CmdkerberosInitCmdПуть к команде Kerberos kinit./usr/bin/kinitСтрока
    Kerberos Principal To Local RuleskerberosPrincipalToLocalRulesСписок правил для преобразования имен участников в короткие имена (обычно это имена пользователей операционной системы). Правила вычисляются по порядку, и первое правило, которое соответствует имени участника, используется для преобразования его в короткое имя. Все последующие правила в списке игнорируются. По умолчанию основные имена в форме {username}/{hostname}{REALM} сопоставляются с {username}. Несколько значений могут быть разделены запятой.DEFAULTСтрока
    Kerberos Renew JitterkerberosRenewJitterПроцент jitter (разница между наибольшей и наименьшей задержкой), добавленный ко времени обновления.0.05Число
    Kerberos Renew Window FactorkerberosRenewWindowFactorПоток входа в систему будет находиться в спящем режиме до тех пор, пока не будет достигнут указанный интервал времени с момента последнего обновления до истечения срока действия пропуска, после чего он попытается продлить действие пропуска.0.8Число
    Sasl Jaas ConfigsaslJaasConfigУкажите параметр kafka sasl.jaas.config, например: org.apache.kafka.common.security.plain.В модуле plainlogin требуется username=USERNAME password=PASSWORD.Строка
    Sasl Kerberos Service NamesaslKerberosServiceNameОсновное имя Kerberos, от имени которого запускается Kafka. Это может быть определено либо в конфигурации JAAS Kafka, либо в конфигурации Kafka.Строка
    Sasl MechanismsaslMechanismМеханизм Simple Authentication and Security Layer (SASL). Допустимые значения приведены в разделе http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml .GSSAPIСтрока
    Security ProtocolsecurityProtocolПротокол, используемый для связи с брокерами. Поддерживаются SASL_PLAINTEXT, PLAINTEXT, SASL_SSL и SSL.PLAINTEXTСтрока
    Ssl Cipher SuitessslCipherSuitesСписок наборов шифров. Это именованная комбинация алгоритмов аутентификации, шифрования, MAC и обмена ключами, используемая для согласования параметров безопасности сетевого подключения с использованием сетевого протокола TLS или SSL. По умолчанию поддерживаются все доступные наборы шифров.Строка
    Ssl Context ParameterssslContextParameters

    Настройка SSL с использованием объекта Camel SSLContextParameters. Если параметр определен, он применяется перед другими параметрами конечной точки SSL.

    Примечание:

    Kafka поддерживает подгрузку keystore только из файла по его полному пути. Поэтому добавьте префикс «file:» перед указанием пути до файла в KeyStoreParameters.resource

    Строка
    Ssl Enabled ProtocolssslEnabledProtocolsСписок протоколов, поддерживающих SSL-соединения. По умолчанию используется TLSv1.2,TLSv1.3 при запуске с Java 11 или новее, TLSv1.2 в противном случае. При использовании значения по умолчанию для Java 11 клиенты и серверы предпочтут TLSv1.3, если оба поддерживают его, и в противном случае вернутся к TLSv1.2 (при условии, что оба поддерживают как минимум TLSv1.2). Это значение по умолчанию должно подходить для большинства случаев. Также смотрите документацию по конфигурации для SSLProtocol.Строка
    Ssl Endpoint AlgorithmsslEndpointAlgorithmАлгоритм идентификации конечной точки для проверки имени хоста сервера с помощью сертификата сервера. Используйте none или false, чтобы отключить проверку имени хоста сервера.httpsСтрока
    Ssl Keymanager AlgorithmsslKeymanagerAlgorithmАлгоритм, используемый Key Manager Factory для SSL-соединений. Значение по умолчанию – это алгоритм Key Manager Factory, настроенный для виртуальной машины Java.SunX509Строка
    Ssl Key PasswordsslKeyPasswordПароль от закрытого ключа в файле хранилища ключей или PEM-ключ, указанный в sslKeystoreKey. Это требуется для клиентов только в том случае, если настроена двусторонняя аутентификация.Строка
    Ssl Keystore LocationsslKeystoreLocationРасположение файла хранилища ключей. Это необязательное значение для клиента и может использоваться для двусторонней аутентификации клиента.Строка
    Ssl Keystore PasswordsslKeystorePasswordПароль хранилища для файла хранилища ключей. Это необязательно для клиента и требуется только в том случае, если настроено sslKeystoreLocation. Пароль хранилища ключей не поддерживается для формата PEM.Строка
    Ssl Keystore TypesslKeystoreTypeФормат файла хранилища ключей. Это необязательный параметр для клиента.JKSСтрока
    Ssl ProtocolsslProtocolПротокол SSL, используемый для генерации SSLContext. Значение по умолчанию - TLSv1.3 при запуске с Java 11 или новее, в противном случае - TLSv1.2. Это значение должно подходить для большинства случаев использования. Допустимые значения в последних версиях JVM - TLSv1.2 и TLSv1.3. TLS, TLSv1.1, SSL, SSLv2 и SSLv3 могут поддерживаться в более старых JVM, но их использование не рекомендуется из-за известных уязвимостей в системе безопасности. При использовании значения по умолчанию для этой конфигурации и sslEnabledProtocols клиенты перейдут на TLSv1.2, если сервер не поддерживает TLSv1.3. Если для этой конфигурации установлено значение TLSv1.2, клиенты не будут использовать TLSv1.3, даже если это одно из значений в sslEnabledProtocols, а сервер поддерживает только TLSv1.3.Строка
    Ssl ProvidersslProviderИмя поставщика безопасности, используемого для SSL-подключений. Значение по умолчанию - поставщик безопасности виртуальной машины JVM по умолчанию.Строка
    Ssl Trustmanager AlgorithmsslTrustmanagerAlgorithmАлгоритм, используемый Trust Manager Factory для SSL-подключений. Значение по умолчанию - это алгоритм Trust Manager Factory, настроенный для виртуальной машины Java.PKIXСтрока
    Ssl Truststore LocationsslTruststoreLocationРасположение файла хранилища доверенных сертификатов.Строка
    Ssl Truststore PasswordsslTruststorePasswordПароль для файла хранилища доверенных сертификатов. Если пароль не задан, настроенный файл хранилища доверенных сертификатов по-прежнему будет использоваться, но проверка целостности будет отключена. Пароль хранилища не поддерживается для формата PEM.Строка
    Ssl Truststore TypesslTruststoreTypeФормат файла хранилища доверенных сертификатов. Значение по умолчанию - JKS.JKSСтрока