Debizium SQL Server
Общее описание
Блок Debizium SQL Server подключается к SQL Server и отслеживает изменения в реальном времени (INSERT, UPDATE, DELETE). Использует Debezium для превращения логов транзакций в поток событий, пригодный для интеграции с другими системами.
Блок Debizium SQL Server может использоваться в качестве:
- блока – получателя.
Настройка блока
В данном разделе описаны параметры блока Debizium SQL Server, которые необходимо заполнить при его настройке.
Базовые параметры
| Название в UI | Название атрибута | Описание | Значение по умолчанию | Тип данных |
|---|---|---|---|---|
| Name | name | Обязательный параметр Уникальное имя коннектора Debezium. Используется системой Bercut ESB для его идентификации и регистрации. Попытка зарегистрировать коннектор с уже существующим именем приведет к ошибке. | Строка | |
| Connection | connection | Позволяет выбрать предсозданное подключение к серверу или создать новое с помощью визарда Connection Manager. | Список | |
| Description | description | Краткое описание блока. | Строка | |
| Additional Properties | additionalProperties | Позволяет задать свойства для компонента Debezium, которые нельзя установить напрямую в конфигурации системы. | Строка | |
| Internal Key Converter | internalKeyConverter | Класс конвертера для сериализации и десериализации ключевых данных смещений (offsets). Определяет формат хранения ключей в системе. | org.apache.kafka.connect.json.JsonConverter | Строка |
| Internal Value Converter | internalValueConverter | Класс‑конвертер для сериализации и десериализации данных смещений (offsets). Определяет формат хранения служебной информации о позициях чтения в источнике данных. | org.apache.kafka.connect.json.JsonConverter | Строка |
| Offset Commit Policy | offsetCommitPolicyClass | Класс Java, определяющий условия коммита офсетов (на основе количества обработанных событий и времени с последнего коммита). Должен реализовывать интерфейс OffsetCommitPolicy. | Строка | |
| Offset Commit Timeout Ms | offsetCommitTimeout | Максимальное время (в мс) ожидания фиксации записей и смещений разделов в хранилище смещений перед отменой операции и повторной попыткой в будущем | 5000 | Целое число |
| Offset Flush Interval Ms | offsetCommitInterval | Интервал попытки фиксации смещений (офсетов) | 60000 | Целое число |
| Offset Storage | offsetStorage | Класс, отвечающий за хранение смещений (offsets) коннектора. | org.apache.kafka.connect.storage.FileOffsetBackingStore | Строка |
| Offset Storage File Name | offsetStorageFileName | Путь к файлу, в котором хранятся смещения (offsets), если используется файловое хранилище. | Строка | |
| Offset Storage Partitions | offsetStoragePartitions | Количество партиций для топика смещений, если используется KafkaOffsetBackingStore. | Целое число | |
| Offset Storage Replication Factor | offsetStorageReplicationFactor | Фактор репликации для топика смещений. | Целое число | |
| Offset Storage Topic | offsetStorageTopic | Имя топика, используемого для хранения смещений. | Строка | |
| Binary Handling Mode | binaryHandlingMode | Способ обработки бинарных данных (bytes — как массив байтов). | bytes | Строка |
| Column Exclude List | columnExcludeList | Список столбцов (по шаблону), которые следует исключить из отслеживания изменений. | Строка | |
| Column Include List | columnIncludeList | Список столбцов (по шаблону), которые должны отслеживаться на изменения. | Строка | |
| Column Propagate Source Type | columnPropagateSourceType | Указывает, нужно ли передавать исходный тип данных столбца в схеме сообщения. | Строка | |
| Converters | converters | Опциональный список пользовательских конвертеров для преобразования данных (вместо стандартных). | Строка | |
| Custom Metric Tags | customMetricTags | Пользовательские теги (ключ‑значение) для настройки имени MBean‑объекта (метрики). | Строка | |
| Database Dbname | databaseDbname | Имя базы данных SQL Server, из которой фиксируются изменения. | Строка | |
| Database Hostname | databaseHostname | Хост (адрес) сервера SQL Server. | Строка | |
| Database Instance | databaseInstance | Имя экземпляра Microsoft SQL Server, к которому выполняется подключение. Используется в сценариях, когда на одном сервере развернуто несколько экземпляров SQL Server (именованные экземпляры). . Если экземпляр стандартный (по умолчанию), поле можно оставить пустым — будет использовано подключение к экземпляру по умолчанию | Строка | |
| Database Names | databaseNames | Имена БД SQL Server, из которых коннектор должен захватывать изменения. Можно указать несколько имен через разделитель | Строка | |
| Database Password | databasePassword | Обязательный параметр Пароль пользователя для подключения к SQL Server. | Строка | |
| Database Port | databasePort | Порт сервера SQL Server. | 5432 | Целое число |
| Database Query Timeout Ms | databaseQueryTimeoutMs | Тайм‑аут (в мс) для SQL‑запросов к БД. Может задаваться в формате времени (например, 10m). | 10m | Строка |
| Database User | databaseUser | Имя пользователя для подключения к серверу SQL Server. | Строка | |
| Data Query Mode | dataQueryMode | Определяет способ, которым Debezium SQL Server запрашивает данные из механизма Change Data Capture (CDC). Возможные значения:
| function | Строка |
| Datatype Propagate Source Type | datatypePropagateSourceType | Передача исходного типа и длины данных БД в схеме сообщения (для кастомных типов). | Строка | |
| Decimal Handling Mode | decimalHandlingMode | Способ обработки десятичных чисел. Возможные значения:
| precise | Строка |
| Errors Max Retries | errorsMaxRetries | Максимальное число попыток повторной обработки ошибки ( "-1" — бесконечно). | -1 | Целое число |
| Event Processing Failure Handling Mode | eventProcessingFailureHandlingMode | Стратегия обработки ошибок при обработке событий коннектором. Возможные значения:
| fail | Строка |
| Heartbeat Action Query | heartbeatActionQuery | SQL‑запрос, выполняемый при отправке heartbeat-сигнала для поддержания активности соединения с СУБД и подтверждения работоспособности коннектора. | Строка | |
| Heartbeat Interval Ms | heartbeatIntervalMs | Определяет интервал в миллисекундах, с которым коннектор периодически отправляет сигналы «сердцебиения» (heartbeat) в специальный топик. Эти сигналы подтверждают активность коннектора, помогают отслеживать его состояние, а также предотвращают разрыв соединения из‑за бездействия. Зачение 0 отключает отправку heartbeat‑сигналов | 0ms | Строка |
| Heartbeat Topics Prefix | heartbeatTopicsPrefix | Задает префикс для префикс для темплейтов, связанных с heartbeat-событиями. Позволяет изолировать служебные сообщения от основных данных. | __debezium-heartbeat | Строка |
| Include Schema Changes | includeSchemaChanges | Определяет, должен ли Debezium Oracle Connector публиковать изменения схемы базы данных (DDL) в топик с именем, совпадающим с ID сервера БД. При включении (true) каждое изменение схемы записывается с ключом (имя БД) и значением (логическое описание новой схемы и, опционально, DDL‑операции). Параметр не влияет на внутреннее ведение истории схемы коннектором. | True | Логическое значение |
| Include Schema Comments | includeSchemaComments | Определяет, должен ли Debezium Oracle Connector извлекать комментарии к таблицам и колонкам и добавлять их в объекты метаданных. Важно: включение (true) увеличивает потребление памяти, поскольку для каждой колонки создается дополнительный строковый объект. | False | Логическое значение |
| Incremental Snapshot Allow Schema Changes | incrementalSnapshotAllowSchemaChanges | Определяет, разрешены ли изменения схемы БД (DDL‑операции) во время выполнения инкрементального снимка. При включении (true) коннектор адаптируется к изменениям структуры таблиц (добавление/удаление колонок и т. д.) без остановки репликации. При отключении (false) — изменения схемы могут привести к ошибкам. False | Логическое значение | |
| Incremental Snapshot Chunk Size | incrementalSnapshotChunkSize | Задает количество строк в одном чанке (порции данных) при выполнении инкрементального снимка. Меньшие значения снижают нагрузку на БД и сеть, но увеличивают число запросов. Большие значения ускоряют обработку, но могут перегрузить систему. Пример: 1024 строк на чанк. | 1024 | Логическое значение |
| Incremental Snapshot Option Recompile | incrementalSnapshotOptionRecompile | Указывает, следует ли принудительно перекомпилировать запросы или объекты БД перед выполнением инкрементального снимка. Используется для устранения проблем с устаревшими планами выполнения запросов или кэшированными объектами. | False | Логическое значение |
| Incremental Snapshot Watermarking Strategy | incrementalSnapshotWatermarkingStrategy | Определяет стратегию установки водяных знаков (watermark) для отслеживания прогресса инкрементного снимка. Возможные значения:
| INSERT_INSERT | Строка |
| Max Batch Size | maxBatchSize | Максимальное количество записей из источника, обрабатываемых в одном пакете за итерацию. Увеличение значения может повысить пропускную способность, но увеличит нагрузку на память и задержку обработки. Уменьшение значения снижает нагрузку, но может снизить общую производительность. | 2048 | Целое число |
| Max Iteration Transactions | maxIterationTransactions | Ограничивает количество транзакций, обрабатываемых за одну итерацию при потоковой передаче изменений из нескольких таблиц БД. Позволяет снизить потребление памяти коннектором. | 500 | Целое число |
| Max Queue Size | maxQueueSize | Максимальный размер очереди для событий изменений, прочитанных из журнала базы данных, но еще не записанных и не переданных дальше. Должен быть всегда больше максимального размера батча (maxBatchSize). Позволяет буферизовать данные при временных задержках в обработке или отправке. | 8192 | Целое число |
| Max Queue Size In Bytes | maxQueueSizeInBytes | Максимальный объем очереди (в байтах) для событий изменений, прочитанных из журнала базы данных, но еще не записанных и не переданных дальше. Значение 0 означает, что ограничение по объему отключено — очередь может расти без жестких лимитов по размеру (ограничения могут накладываться другими механизмами). Позволяет контролировать потребление памяти и предотвращать переполнение при пиковых нагрузках. | 0 | Целое число |
| Message Key Columns | messageKeyColumns | Список выражений (разделенных точкой с запятой), определяющих полные имена таблиц и колонок, которые будут использоваться в качестве ключа сообщения. Формат каждого выражения: DB_NAME.TABLE_NAME:COLUMN_NAME или SCHEMA_NAME.TABLE_NAME:COLUMN_NAME. | Строка | |
| Notification Enabled Channels | notificationEnabledChannels | Список имен каналов уведомлений, которые активированы. Возможные значения зависят от реализации системы. | Строка | |
| Notification Sink Topic Name | notificationSinkTopicName | Имя топика, в который отправляются уведомления. Обязательно, если в списке включенных каналов (notificationEnabledChannels) присутствует значение sink. | Строка | |
| Poll Interval Ms | pollIntervalMs | Время (в миллисекундах), которое коннектор ожидает появления новых событий изменений после того, как не получил ни одного события в предыдущем цикле опроса. Коннектор приостанавливается на указанный период перед следующим запросом к источнику данных. | 500ms | Строка |
| Post Processors | postProcessors | Опциональный список пост‑процессоров. Процессоры определяются через параметр .type , настраиваются дополнительными опциями. | Строка | |
| Provide Transaction Metadata | provideTransactionMetadata | Включает извлечение метаданных транзакции вместе с подсчетом событий. | False | Логическое значение |
| Retriable Restart Connector Wait Ms | retriableRestartConnectorWaitMs | Время ожидания (в миллисекундах) перед перезапуском коннектора после возникновения повторяемой ошибки (retriable exception). | 10s | Строка |
| Schema History Internal | schemaHistoryInternal | Имя класса SchemaHistory, используемого Debezium для сохранения и восстановления изменений схемы БД. | io.debezium.storage.kafka.history.KafkaSchemaHistory | Строка |
| Schema History Internal File Filename | schemaIncludeList | Список схем, для которых должны захватываться события изменений. Используется для фильтрации по схемам — реплицируются только указанные схемы. | Строка | |
| Schema History Internal Skip Unparseable Ddl | schemaHistoryInternalSkipUnparseableDdl | Управляет поведением коннектора Debezium при обнаружении в бинарном логе (binlog) DDL‑операции, которую коннектор не может распознать/распарсить. Если false, то при встрече с нераспознаваемой DDL‑операцией коннектор останавливает работу и переходит в состояние ошибки. Это гарантирует целостность метаданных, но требует ручного вмешательства для восстановления. | False | Логическое значение |
| Schema History Internal Store Only Captured Databases Ddl | schemaHistoryInternalStoreOnlyCapturedDatabasesDdl | Управляет тем, какие DDL‑операции Debezium будет сохранять в истории схемы базы данных. Если false, коннектор сохраняет все входящие DDL‑операции из бинарного лога (binlog), независимо от того, к какой базе данных или схеме они относятся. При true коннектор сохраняет только DDL‑операции, которые затрагивают таблицы из отслеживаемых (captured) баз данных/схем, настроенных в коннекторе. | False | Логическое значение |
| Schema History Internal Store Only Captured Tables Ddl | schemaHistoryInternalStoreOnlyCapturedTablesDdl | Определяет, какие DDL‑операции Debezium сохраняет в истории схемы БД. При false: сохраняются все входящие DDL‑операции из binlog. Если true: сохраняются только DDL‑операции, затрагивающие отслеживаемые таблицы. | False | Логическое значение |
| Schema Name Adjustment Mode | schemaNameAdjustmentMode | Определяет способ корректировки имен схем для совместимости с конвертером сообщений. Возможные значения:
| none | Строка |
| Signal Data Collection | signalDataCollection | Имя коллекции данных, используемой для отправки сигналов/команд в Debezium. Для многораздельных коннекторов можно указать несколько коллекций через запятую. При отсутствии значения сигнализация отключена. | Строка | |
| Signal Enabled Channels | signalEnabledChannels | Список имен каналов, через которые принимаются сигналы управления. Канал source включен по умолчанию. Возможные значения:
| source | Строка |
| Signal Poll Interval Ms | signalPollIntervalMs | Интервал (в миллисекундах) для проверки новых сигналов в зарегистрированных каналах. | 5s | Строка |
| Skipped Operations | skippedOperations | Список операций (через запятую), пропускаемых во время потоковой передачи. Возможные значения:
| t | Строка |
| Snapshot Delay Ms | snapshotDelayMs | Задержка (в миллисекундах) перед началом снимка данных. Позволяет отложить старт репликации для подготовки окружения. | 0ms | Строка |
| Snapshot Fetch Size | snapshotFetchSize | Максимальное число записей, загружаемых в память при выполнении снимка данных (snapshot). | Целое число | |
| Snapshot Include Collection List | snapshotIncludeCollectionList | Список таблиц/коллекций, для которых должен быть выполнен снимок данных при создании или перезапуске коннектора. | Строка | |
| Snapshot Isolation Mode | snapshotIsolationMode | Определяет уровень изоляции транзакции и длительность блокировок таблиц при выполнении снапшота в SQL Server. Возможные значения:
| repeatable_read | Строка |
| Snapshot Lock Timeout Ms | snapshotLockTimeoutMs | Максимальное время (в миллисекундах) ожидания блокировок таблиц в начале снимка. Если блокировки не получены за это время, снимок прерывается. | 10s | Строка |
| Snapshot Max Threads | snapshotMaxThreads | Максимальное количество потоков, используемых для выполнения снимка данных. | 1 | Целое число |
| Snapshot Mode | snapshotMode | Определяет условия запуска снимка при старте коннектора. Возможные значения:
| initial | Строка |
| Snapshot Mode Configuration Based Snapshot Data | snapshotModeConfigurationBasedSnapshotData | Указывает, нужно ли снимать данные при режиме configuration_based. | False | Логическое значение |
| Snapshot Mode Configuration Based Snapshot On Data Error | snapshotModeConfigurationBasedSnapshotOnDataError | Указывает, нужно ли снимать данные в случае ошибки при режиме configuration_based. | False | Логическое значение |
| Snapshot Mode Configuration Based Snapshot On Schema Error | snapshotModeConfigurationBasedSnapshotOnSchemaError | Указывает, нужно ли снимать схему в случае ошибки при режиме configuration_based. | False | Логическое значение |
| Snapshot Mode Configuration Based Snapshot Schema | snapshotModeConfigurationBasedSnapshotSchema | Указывает, нужно ли снимать схему при режиме configuration_based. | False | Логическое значение |
| Snapshot Mode Configuration Based Start Stream | snapshotModeConfigurationBasedStartStream | Указывает, нужно ли запускать поток изменений после снимка при режиме configuration_based. | False | Логическое значение |
| Snapshot Mode Custom Name | snapshotModeCustomName | Имя пользовательской реализации (реализует интерфейс Snapshotter), определяющей логику снимка (используется при snapshotMode=custom). | Строка | |
| Snapshot Select Statement Overrides | snapshotSelectStatementOverrides | Список таблиц (через запятую) с переопределенными SELECT‑запросами для снимка. Позволяет задать кастомные запросы для отдельных таблиц. Формат: DB_NAME.TABLE_NAME или SCHEMA_NAME.TABLE_NAME. | Строка | |
| Snapshot Tables Order By Row Count | snapshotTablesOrderByRowCount | Определяет порядок обработки таблиц в начальном снимке по количеству строк. Возможные значения:
| disabled | Строка |
| Sourceinfo Struct Maker | sourceinfoStructMaker | Имя класса SourceInfoStructMaker, который возвращает схему и структуру SourceInfo для событий репликации. | io.debezium.connector.sqlserver.SqlServerSourceInfoStructMaker | Строка |
| Streaming Delay Ms | streamingDelayMs | Задержка (в миллисекундах) между завершением снимка данных и началом потоковой передачи изменений. Позволяет выполнить дополнительные подготовительные действия. | 0ms | Строка |
| Table Exclude List | tableExcludeList | Список регулярных выражений (через запятую), сопоставляемых с полными именами таблиц, которые следует исключить из мониторинга. Позволяет фильтровать таблицы по шаблонам. | Строка | |
| Table Include List | tableIncludeList | Список таблиц, для которых должны захватываться изменения данных. Используется для точной настройки репликации — обрабатываются только указанные таблицы. | Строка | |
| Time Precision Mode | timePrecisionMode | Определяет способ представления временных типов данных (TIME, DATE, TIMESTAMP). Возможные значения:
| adaptive | Строка |
| Tombstones On Delete | tombstonesOnDelete | Указывает, следует ли представлять операции удаления как два события: событие удаления и последующее «надгробие» (tombstone). Если true, система может полностью удалить все события с данным ключом после удаления записи в источнике. | False | Логическое значение |
| Topic Naming Strategy | topicNamingStrategy | Имя класса, определяющего правила формирования имен топиков для разных типов событий (изменения данных, схемы, транзакции, heartbeat и т. д.). | io.debezium.schema.SchemaTopicNamingStrategy | Строк |
| Topic Prefix | topicPrefix | Обязательный параметр Префикс топиков, идентифицирующий сервер/кластер БД. Должен быть уникальным для всех коннекторов. Используется как основа для имен топиков. Допустимы: буквы, цифры, дефисы, точки, подчеркивания. | Строка | |
| Transaction Metadata Factory | transactionMetadataFactory | Класс, отвечающий за создание контекста транзакции и структур/схем транзакций. Определяет, как будут представлены транзакции в событиях репликации. | io.debezium.pipeline.txmetadata.DefaultTransactionMetadataFactory | Строка |
Расширенные параметры
| Название в UI | Название атрибута | Описание | Значение по умолчанию | Тип данных |
|---|---|---|---|---|
| Bridge Error Handler | bridgeErrorHandler | Определяет стратегию обработки ошибок на уровне интеграционного моста (bridge) в Bercut ESB. Отвечает за реакцию системы на сбои при передаче сообщений между компонентами. Может включать повторные попытки отправки, перенаправление в очередь ошибок, логирование и т. д. Функция доступна только для тех сторонних компонентов, которые позволяют системе получать уведомления о возникших исключениях. Некоторые компоненты обрабатывают ошибки внутри себя — в таких случаях использование bridgeErrorHandler невозможно. | False | Логическое значение |
| Exception Handler | exceptionHandler | Определяет стратегию обработки исключений на уровне компонента. По умолчанию используется стандартный обработчик org.apache.camel.spi.ExceptionHandler, который логирует ошибки на уровнях WARN или ERROR и игнорирует их. Позволяет настроить кастомную логику обработки (повторные попытки, перенаправление в очередь ошибок и т. д.). | Строка | |
| Exchange Pattern | exchangePattern | Задает шаблон взаимодействия между компонентами в интеграционном потоке. Определяет способ передачи сообщений и ожидания ответа. Возможные значения:
| Строка |