Как передавать большие файлы с помощью блока AWS S3
Платформа Bercut ESB позволяет передавать файлы большого размера – до 100 мБ с использованием блоков – обработчиков файлов SFTP, AWS S3, SMB и при использовании других интеграционных блоков в качестве вспомогательных. Файлы можно передавать целиком или по частям.
Настройка обработки целых файлов в кластере
Для реализации отказоустойчивой кластерной обработки файлов несколькими узлами создан пользовательский идемпотентный репозиторий.
Репозиторий предназначен для выполнения следующих функций:
-
Блокировать файл на время обработки его узлом
-
Позволять другим узлам переходить к следующему файлу при обнаружении заблокированного.
Если на узле произойдет сбой, то через заданное время файл разблокируется и может быть обработан любым другим узлом, в том числе и тем, на котором произошел сбой, если к моменту восстановления файл не был занят.
Таблица репозитория содержит:
-
Композитный ключ: имя узла + имя файла
-
Индекс по имени файла
Чтобы реализовать обработку больших целых файлов с помощью блока AWS S3, откройте редактор потоков и настройте поток с конфигурацией, как показано на рисунке ниже. Блоки Logger и Delay являются вспомогательными и не несут на себе функциональную нагрузку по передаче файлов.

- Добавьте в качестве инициирующего блока и сконфигурируйте блок AWS S3, как показано на рисунках ниже.
Красным цветом отмечены критически важные настройки, которые должны быть определены. Для строковых параметров приведены примеры заполнения. Там, где значения должны иметь строго определенное значение, эти значения приведены в списке над картинками.
-
Bucket Name or Arn
-
Auto Create Bucket = True
-
Forth Path Style = True
-
Override Endpoint = True

-
Region = us-east-1
-
Uri Endpoint Override

-
Max Messages per Poll – сколько файлов за один раз возьмет нода (значение по умолчанию 10)
-
Proxy Protocol – выставляется в зависимости от настроек AWS S3

- Delay – интервал опроса бакета

- In Progress Repository = #jdbcClusterIdempotentRepository

-
Access Key
-
Secret Key
-
Trust All Certificates – выставляется в зависимости от настроек AWS S3

-
Нажмите Применить.
-
При необходимости добавьте блоки логгирования Logger и блок временной задержки между обработкой файлов Delay. В каждом блоке сконфигурируйте настройки и примените изменения, нажав Применить.
Примеры конфигурации дополнительных блоков показаны на рисунках ниже.
Блок Logger:

Блок Delay:

- Добавьте второй блок AWS S3 и сконфигурируйте его параметры, как показано на рисунках ниже.
Красным цветом отмечены критически важные настройки, которые должны быть определены. Host
-
Bucket Name or Arn – такой же, как в первом блоке AWS S3
-
Auto Create Bucket = True
-
Forth Path Style = True
-
Override Endpoint = True

-
Region = us-east-1
-
Uri Endpoint Override – такой же, как в первом блоке AWS S3

- Proxy Protocol – выставляется в зависимости от настроек AWS S3

-
Access Key
-
Secret Key
-
Trust All Certificates – выставляется в зависимости от настроек AWS S3

-
Нажмите Применить.
-
Нажмите Сохранить. Версия потока сохранена.
-
Чтобы проверить код потока, перейдите режим отображения кода, нажав на кнопку Код в правом верхнем углу рабочей области.

В рабочей области отобразится код потока.

- Проверьте критические параметры в коде:
configurationItems:
- route:
id: 74d63548-0610-460f-a0fc-b096dc6dcdf7
from:
id: from-s3-endpoint
uri: aws2-s3://input-bucket
parameters:
delay: '7000'
greedy: false
region: us-east-1
timeUnit: MILLISECONDS
accessKey: minioadmin
scheduler: none
secretKey: minioadmin
ignoreBody: false
includeBody: true
pojoRequest: false
repeatCount: 0
initialDelay: 1000
autocloseBody: true
moveAfterRead: false
proxyProtocol: HTTP
useFixedDelay: true
forcePathStyle: true
includeFolders: true
maxConnections: 60
startScheduler: true
deleteAfterRead: true
runLoggingLevel: TRACE
autoCreateBucket: true
overrideEndpoint: true
bridgeErrorHandler: false
maxMessagesPerPoll: '1'
uriEndpointOverride: http://serv-1.dev.local:9000
inProgressRepository: '#jdbcClusterIdempotentRepository'
trustAllCertificates: true
useSessionCredentials: false
sendEmptyMessageWhenIdle: false
useDefaultCredentialsProvider: false
useProfileCredentialsProvider: false
steps:
- log:
id: c0f64cb6-f843-4819-b4ea-ef68c7be7c50
message: "Файл ${header.CamelAwsS3Key} обработан нодой ${hostname} в ${date:now:yyyy-MM-dd HH:mm:ss}"
disabled: 'false'
loggingLevel: INFO
- delay:
id: c967d243-6b21-4d19-b76d-dc29c0e1e2ef
disabled: false
expression:
constant:
trim: 'true'
expression: '10000'
resultType: java.lang.Long
asyncDelayed: true
callerRunsWhenRejected: true
- to:
id: 5d57dd31-a062-439e-8589-bd16be3c9d69
uri: aws2-s3://processed-bucket
parameters:
delay: '500'
greedy: false
region: us-east-1
partSize: 26214400
timeUnit: MILLISECONDS
useSSES3: false
accessKey: minioadmin
batchSize: 1000000
scheduler: none
secretKey: minioadmin
useAwsKMS: false
bufferSize: 1000000
ignoreBody: false
pojoRequest: false
repeatCount: 0
initialDelay: 1000
proxyProtocol: HTTP
useFixedDelay: true
forcePathStyle: true
namingStrategy: progressive
startScheduler: true
useCustomerKey: false
multiPartUpload: false
runLoggingLevel: TRACE
autoCreateBucket: true
deleteAfterWrite: false
overrideEndpoint: true
restartingPolicy: override
lazyStartProducer: false
batchMessageNumber: 10
streamingUploadMode: false
uriEndpointOverride: http://serv-1.dev.local:9000
trustAllCertificates: true
useSessionCredentials: false
useDefaultCredentialsProvider: false
useProfileCredentialsProvider: false
- Активируйте поток.
Файлы перемещаются из бакета input-bucket в processed-bucket. Узлы кластера блокируют файлы на время обработки. Если на узле происходит сбой, через заданное время файл разблокируется и может быть обработан любым другим узлом . Блокировка файлов является динамической и зависит от времени обработки файла. После обработки запись из таблицы блокировки файлов удаляется, блокировка снимается.
Настройка обработки структурированных файлов по частям в кластере
Для обработки структурированных файлов по частям реализован репозиторий с хранением состояния элементов. Репозиторий организован следующим образом:
-
Таблица содержит композитный ключ (имя файла + ID элемента)
-
Добавлен индекс по элементу
-
Файл загружается в RAM целиком для последовательной генерации ID элементов.
Ниже приведены шаги по настройке обработки структурированного файла. Для примера обработки структурированного файла был собран поток, который находит все токены ObjectEvent или AggregationEvent в загруженном xml-файле. Сам файл никуда не перемещается.
Развернутая схема потока приведена на рисунке ниже.

Блоки Logger и Delay являются вспомогательными для процесса и добавляются по необходимости. В приведённом ниже примере они добавлены в сценарий.
Схема потока со свернутыми блоками, входящими в блок Split приведена на рисунке ниже.
Поток включает следующие блоки:
AWS S3 🡪 Logger 🡪 Convert Body To 🡪 Split 🡪 Logger

Схема блоков, входящих в блок Split приведена на рисунке ниже.
Блок Split включает следующие блоки:
Set Header 🡪 Idempotent Consumer (Logger 🡪 Delay 🡪 Logger)

Чтобы подготовить обработку больших файлов по частям, откройте редактор потоков и настройте поток с конфигурацией, как описано ниже.
-
Добавьте на рабочую область блок AWS S3 в качестве инициирующего блока. Сконфигурируйте его параметры так же, как в первом примере – для обработки целых файлов.
-
Добавьте блоки Logger и блок Delay добавляем и заполняем при необходимости (ниже приведен код потока с логгерами и delay).
Добавьте и сконфигурируйте критически важные блоки.
- Добавьте после AWS S3 и первого логгера блок Convert Body To.
- Установите значение параметра Type: java.Lang.String

- Добавьте блок Split. Установите следующие параметры:
-
Language: xpath
-
Expression: //ObjectEvent | //AggregationEvent

- Добавьте блок Set Header первым в блок Split. Установите следующие параметры:
Name: itemIndex
Language: simple
Expression: ${exchangeProperty.CamelSplitIndex}

- Добавьте блок Idempotent Consumer после блока Set Header. Установите следующие параметры:
Language: simple
Expression: ${header.CamelFileName}-${header.itemIndex}
Idempotent Repository: JdbcXmlItemIdempotentRepository

-
Добавьте вспомогательные блоки в блок Idempotent Consumer.
-
Сохраните версию потока.
-
Проверьте критические параметры в коде:
configurationItems:
- route:
id: fd40addc-b02e-48c0-a3fe-09f360c163ce
from:
id: from-s3-endpoint
uri: aws2-s3://input-bucket
parameters:
delay: '500'
greedy: false
region: us-east-1
timeUnit: MILLISECONDS
accessKey: minioadmin
scheduler: none
secretKey: minioadmin
ignoreBody: false
includeBody: true
pojoRequest: false
repeatCount: 0
initialDelay: 1000
autocloseBody: true
moveAfterRead: false
proxyProtocol: HTTP
useFixedDelay: true
forcePathStyle: true
includeFolders: true
maxConnections: 60
startScheduler: true
deleteAfterRead: true
runLoggingLevel: TRACE
autoCreateBucket: false
overrideEndpoint: true
bridgeErrorHandler: false
maxMessagesPerPoll: '1'
uriEndpointOverride: http://serv-1.dev.local:9000
inProgressRepository: '#jdbcClusterIdempotentRepository'
trustAllCertificates: false
useSessionCredentials: false
sendEmptyMessageWhenIdle: false
useDefaultCredentialsProvider: false
useProfileCredentialsProvider: false
steps:
- log:
id: 5c352178-7c81-407b-bf01-fe801dfe6a24
message: "Начата обработка файла ${header.CamelAwsS3Key}"
disabled: 'false'
loggingLevel: INFO
- convertBodyTo:
id: 7dcc6264-fc1b-4a03-97f7-fdba9dfce401
type: java.lang.String
disabled: false
mandatory: true
- split:
id: c0fcba4a-8c26-4f98-b2c1-23580db82639
disabled: false
delimiter: ','
streaming: false
expression:
xpath: "//ObjectEvent | //AggregationEvent"
synchronous: false
shareUnitOfWork: false
stopOnException: false
parallelProcessing: false
aggregationStrategyMethodAllowNull: false
steps:
- setHeader:
id: 6729289b-4a39-40b4-ac2a-36de9da772b2
name: itemIndex
disabled: false
expression:
simple:
trim: 'true'
expression: "${exchangeProperty.CamelSplitIndex}"
- idempotentConsumer:
id: 4c3fcc0d-1217-47be-ac2a-80efd5b38ffb
eager: true
disabled: false
expression:
simple:
trim: 'true'
expression: "${header.CamelAwsS3Key}-${header.itemIndex}"
skipDuplicate: true
completionEager: false
removeOnFailure: true
idempotentRepository: '#jdbcXmlItemIdempotentRepository'
steps:
- log:
id: e9283031-7be0-4534-9d32-35ceabae1421
message: "Начало обработки элемента ${header.itemIndex} из S3 файла ${header.CamelAwsS3Key}"
disabled: 'false'
loggingLevel: INFO
- delay:
id: 03f67a40-4223-4299-84c3-d5d2ae272e55
disabled: false
expression:
constant:
trim: 'true'
expression: '500'
resultType: java.lang.Long
asyncDelayed: true
callerRunsWhenRejected: true
- log:
id: 500b76fe-4c16-437c-a092-f2e9837d5854
message: "Элемент ${header.itemIndex} обработан"
disabled: 'false'
loggingLevel: INFO
- log:
id: 5d57dd31-a062-439e-8589-bd16be3c9d69
message: "Файл ${header.CamelAwsS3Key} полностью обработан"
disabled: 'false'
loggingLevel: INFO
- Активируйте поток.
Узлы кластера берут файлы из бакета input-bucket и блокируют их на время обработки, далее узел последовательно обрабатывает все токены с указанным именем взятого файла (в примере ObjectEvent или AggregationEvent), заносит индекс обработанного токена в БД. Если на обрабатывавшем файл узле в процессе обработки происходит сбой, то по истечении времени блокировки файла обработку начнет любой другой узел с того индекса, где остановился предыдущий узел.