Как передавать большие файлы с помощью блока SFTP
Платформа Bercut ESB позволяет передавать файлы большого размера – до 100 мБ с использованием блока SFTP и других интеграционных блоков. Файлы можно передавать целиком или по частям.
Настройка обработки целых файлов в кластере
Для реализации отказоустойчивой кластерной обработки файлов несколькими узлами создан пользовательский идемпотентный репозиторий.
Репозиторий предназначен для выполнения следующих функций:
-
Блокировать файл на время обработки его узлом
-
Позволять другим узлам переходить к следующему файлу при обнаружении заблокированного.
Если на узле произойдет сбой, то через заданное время файл разблокируется и может быть обработан любым другим узлом, в том числе и тем, на котором произошел сбой, если к моменту восстановления файл не был занят.
Таблица репозитория содержит:
-
Композитный ключ: имя узла + имя файла
-
Индекс по имени файла
Чтобы реализовать обработку больших целых файлов, откройте редактор потоков и настройте поток с конфигурацией, как показано на рисунке ниже. Блоки Logger и Delay являются вспомогательными и не несут на себе функциональную нагрузку по передаче файлов.

- Добавьте в качестве инициирующего блока и сконфигурируйте блок SFTP, как показано на рисунках ниже.
Красным цветом отмечены критически важные настройки, которые должны быть определены. Для строковых параметров приведены примеры заполнения.
-
Host
-
Port
-
Directory name

-
Idempotent = True
-
Idempotent Eager = True
-
Idempotent Repository = #jdbcClusterIdempotentRepository
-
Max Messages per Poll
-
Move – временная папка, куда перемещаются файлы на время блокировки

- Read Lock Remove On Commit = True

- Download = True

- Stepwise = False

-
Password
-
Username
-
Use User Known Hosts File = False

-
Нажмите Применить.
-
При необходимости добавьте блоки логгирования Logger и блок временной задержки между обработкой файлов Delay. В каждом блоке сконфигурируйте настройки и примените изменения, нажав Применить.
Примеры конфигурации дополнительных блоков показаны на рисунках ниже.
Первый блок Logger:

Блок Delay:

Второй блок Logger:

- Добавьте второй блок SFTP и сконфигурируйте его параметры, как показано на рисунках ниже.
Красным цветом отмечены критически важные настройки, которые должны быть определены. Host
-
Host – такой же, как в первом блоке SFTP
-
Port – такой же, как в первом блоке SFTP
-
Directory name

-
Idempotent = True
-
Idempotent Eager = True

-
Password
-
Username
-
Use User Known Hosts File = False

-
Нажмите Применить.
-
Нажмите Сохранить. Версия потока сохранена.
-
Чтобы проверить код потока, перейдите режим отображения кода, нажав на кнопку Код в правом верхнем углу рабочей области.

В рабочей области отобразится код потока.

- Проверьте критические параметры в коде:
configurationItems:
- route:
id: 54e3793c-184e-48d0-9d5d-710f892b039c
from:
id: 54e3793c-184e-48d0-9d5d-710f892b039c
uri: sftp
parameters:
host: red-1.dev.local
move: ".processing/${file:name}.${hostname}"
noop: false
port: "2222"
delay: "5000"
binary: false
delete: false
greedy: false
preSort: false
shuffle: false
timeout: "30000"
useList: true
download: true
maxDepth: 2147483647
password: pass
readLock: none
stepwise: false
timeUnit: MILLISECONDS
username: user
recursive: false
scheduler: none
separator: UNIX
soTimeout: "300000"
autoCreate: false
disconnect: false
idempotent: true
passiveMode: false
repeatCount: 0
initialDelay: 1000
directoryName: incoming
useFixedDelay: true
connectTimeout: "10000"
readLockMinAge: 0
reconnectDelay: "1000"
startScheduler: true
streamDownload: false
fastExistsCheck: false
idempotentEager: true
readLockTimeout: 10000
runLoggingLevel: TRACE
jschLoggingLevel: WARN
readLockMinLength: 1
bridgeErrorHandler: false
maxMessagesPerPoll: "1"
readLockMarkerFile: false
serverAliveCountMax: 1
existDirCheckUsingLs: true
idempotentRepository: "#jdbcClusterIdempotentRepository"
readLockLoggingLevel: DEBUG
readLockCheckInterval: 1000
strictHostKeyChecking: "no"
useUserKnownHostsFile: false
antFilterCaseSensitive: true
readLockRemoveOnCommit: true
eagerMaxMessagesPerPoll: true
readLockRemoveOnRollback: false
sendEmptyMessageWhenIdle: false
serverMessageLoggingLevel: DEBUG
readLockDeleteOrphanLockFiles: true
throwExceptionOnConnectFailed: false
readLockIdempotentReleaseAsync: false
ignoreFileNotFoundOrPermissionError: false
steps:
- log:
id: a1c06e14-0784-4757-927e-627bc7ffb0fd
message: "Файл ${header.CamelFileName} захвачен нодой ${hostname}"
disabled: "false"
loggingLevel: INFO
- delay:
id: ccb5dc07-ab20-4f2d-b343-e6f6c8588e56
disabled: false
constant: null
trim: "true"
expression: "25000"
resultType: java.lang.Long
asyncDelayed: true
callerRunsWhenRejected: true
- log:
id: 0d4ec145-cc19-4d1e-b1f1-8075ceea2c68
message: "Файл ${header.CamelFileName} обработан"
disabled: "false"
loggingLevel: INFO
- to:
id: af5a4ad0-1474-4cee-823c-4617965d591c
uri: sftp
parameters:
host: red-1.dev.local
noop: false
port: "2222"
delay: 500
binary: false
delete: false
greedy: false
flatten: false
preSort: false
shuffle: false
timeout: "30000"
useList: true
download: false
maxDepth: 2147483647
password: pass
readLock: none
sendNoop: true
stepwise: false
timeUnit: MILLISECONDS
username: user
fileExist: Override
recursive: false
scheduler: none
separator: UNIX
soTimeout: "300000"
autoCreate: true
disconnect: false
idempotent: "false"
passiveMode: false
repeatCount: 0
initialDelay: 1000
allowNullBody: false
directoryName: processed
useFixedDelay: true
connectTimeout: "10000"
readLockMinAge: 0
reconnectDelay: "1000"
startScheduler: true
streamDownload: false
fastExistsCheck: true
idempotentEager: true
readLockTimeout: 10000
runLoggingLevel: TRACE
jschLoggingLevel: WARN
keepLastModified: false
lazyStartProducer: false
readLockMinLength: 1
bridgeErrorHandler: false
readLockMarkerFile: false
serverAliveCountMax: 1
existDirCheckUsingLs: true
readLockLoggingLevel: DEBUG
eagerDeleteTargetFile: true
jailStartingDirectory: true
readLockCheckInterval: 1000
strictHostKeyChecking: "no"
useUserKnownHostsFile: false
antFilterCaseSensitive: true
readLockRemoveOnCommit: false
eagerMaxMessagesPerPoll: true
readLockRemoveOnRollback: false
sendEmptyMessageWhenIdle: false
disconnectOnBatchComplete: false
serverMessageLoggingLevel: DEBUG
readLockDeleteOrphanLockFiles: true
throwExceptionOnConnectFailed: false
readLockIdempotentReleaseAsync: false
ignoreFileNotFoundOrPermissionError: false
- Активируйте поток.
Файлы перемещаются из заданной папки incoming в папку processed. Узлы кластера блокируют файлы на время обработки. Если на узле происходит сбой, через заданное время файл разблокируется и может быть обработан любым другим узлом . Блокировка файлов является динамической и зависит от времени обработки файла.
После обработки запись из таблицы блокировки файлов удаляется и блокировка снимается.
Настройка обработки структурированных файлов по частям в кластере
Для обработки структурированных файлов по частям реализован репозиторий с хранением состояния элементов. Репозиторий организован следующим образом:
-
Таблица содержит композитный ключ (имя файла + ID элемента)
-
Добавлен индекс по элементу
-
Файл загружается в RAM целиком для последовательной генерации ID элементов.
Ниже приведены шаги по настройке обработки структурированного файла. Для этого следует сконфигурировать поток, который находит все токены ObjectEvent или AggregationEvent в загруженном xml-файле. Сам файл никуда не перемещается.
Развернутая схема потока приведена на рисунке ниже.

Блоки Logger и Delay являются вспомогательными для процесса и добавляются по необходимости. В приведённом ниже примере они добавлены в сценарий.
Схема потока со свернутыми блоками, входящими в блок Split приведена на рисунке ниже.
Поток включает следующие блоки:
SFTP 🡪 Logger 🡪 Convert Body To 🡪 Logger 🡪 Split 🡪 Logger

Схема блоков, входящих в блок Split приведена на рисунке ниже.
Блок Split включает следующие блоки:
Set Header 🡪 Idempotent Consumer (Logger 🡪 Delay 🡪 Logger)

Чтобы подготовить обработку больших файлов по частям, откройте редактор потоков и настройте поток с конфигурацией, как описано ниже.
-
Добавьте на рабочую область блок SFTP в качестве инициирующего блока. Сконфигурируйте его параметры так же, как в первом примере – для обработки целых файлов. Все критические параметры заполняются так же, за исключением параметра Move – его значение заполнять не надо.
-
Добавьте блоки Logger и блок Delay добавляем и заполняем при необходимости (ниже приведен код роута с логгерами и delay).
Добавьте и сконфигурируйте критически важные блоки.
- Добавьте после SFTP и первого логгера блок Convert Body To.
Установите значение параметра Type: java.Lang.String

- Добавьте блок Split. Установите следующие параметры:
Language: xpath
Expression: //ObjectEvent | //AggregationEvent

- Добавьте блок Set Header первым в блок Split. Установите следующие параметры:
Name: itemIndex
Language: simple
Expression: ${exchangeProperty.CamelSplitIndex}

- Добавьте блок Idempotent Consumer после блока Set Header. Установите следующие параметры:
Language: simple
Expression: ${header.CamelFileName}-${header.itemIndex}
Idempotent Repository: JdbcXmlItemIdempotentRepository

-
Добавьте вспомогательные блоки в блок Idempotent Consumer.
-
Сохраните версию потока.
-
Проверьте критические параметры в коде:
configurationItems:
- route:
id: f2d5bd2a-54d6-4232-ae5b-f44fc9aebf29
from:
id: from-sftp-endpoint
uri: sftp://red-1.dev.local:2222/incoming
parameters:
username: user
password: pass
move: ""
noop: false
port: "2222"
delay: "5000"
binary: false
delete: false
greedy: false
preSort: false
shuffle: false
timeout: "30000"
useList: true
download: true
maxDepth: 2147483647
readLock: none
stepwise: false
timeUnit: MILLISECONDS
recursive: false
scheduler: none
separator: UNIX
soTimeout: "300000"
autoCreate: true
disconnect: false
idempotent: true
passiveMode: false
repeatCount: 0
initialDelay: 1000
useFixedDelay: true
connectTimeout: "10000"
readLockMinAge: 0
reconnectDelay: "1000"
startScheduler: true
streamDownload: false
fastExistsCheck: false
idempotentEager: true
readLockTimeout: 10000
runLoggingLevel: TRACE
jschLoggingLevel: WARN
readLockMinLength: 1
bridgeErrorHandler: false
maxMessagesPerPoll: "1"
readLockMarkerFile: true
serverAliveCountMax: 1
existDirCheckUsingLs: true
idempotentRepository: "#jdbcClusterIdempotentRepository"
readLockLoggingLevel: DEBUG
readLockCheckInterval: 1000
strictHostKeyChecking: "no"
useUserKnownHostsFile: false
antFilterCaseSensitive: true
readLockRemoveOnCommit: true
eagerMaxMessagesPerPoll: true
readLockRemoveOnRollback: true
sendEmptyMessageWhenIdle: false
serverMessageLoggingLevel: DEBUG
readLockDeleteOrphanLockFiles: true
throwExceptionOnConnectFailed: false
readLockIdempotentReleaseAsync: false
ignoreFileNotFoundOrPermissionError: false
steps:
- log:
id: 5c0c4e7a-26a4-43e3-861d-a835065c8fed
message: "Начата обработка файла ${header.CamelFileName}"
disabled: false
loggingLevel: INFO
- convertBodyTo:
id: a3b64981-0087-4551-85f0-2f890d4efd95
type: java.lang.String
disabled: false
mandatory: true
- log:
id: cf6ccf74-1078-4718-ad56-8d109c8d45a0
message: "Размер файла: ${body.length()} символов"
disabled: false
loggingLevel: INFO
- split:
id: ef42344f-23e0-4c7e-8e78-277974aea1ce
disabled: false
delimiter: ","
streaming: false
expression:
xpath: "//ObjectEvent | //AggregationEvent"
synchronous: false
shareUnitOfWork: false
stopOnException: false
parallelProcessing: false
aggregationStrategyMethodAllowNull: false
steps:
- setHeader:
id: 651deecf-78bc-43f4-a1f7-6bc5e72b75bf
name: itemIndex
disabled: false
expression:
simple:
trim: true
expression: "${exchangeProperty.CamelSplitIndex}"
- idempotentConsumer:
id: 59fb19dc-293e-4a6e-93ed-17ea84d19883
eager: true
disabled: false
expression:
simple:
trim: true
expression: "${header.CamelFileName}-${header.itemIndex}"
skipDuplicate: true
completionEager: false
removeOnFailure: true
idempotentRepository: "#jdbcXmlItemIdempotentRepository"
steps:
- log:
id: dc13c07c-53b0-4d48-854c-c6da5c918b81
message: "Начало обработки элемента ${header.itemIndex}"
disabled: false
loggingLevel: INFO
- delay:
id: dc0281ac-271c-4012-a370-f9eb7937fa80
disabled: false
expression:
constant:
trim: true
expression: "500"
resultType: java.lang.Long
asyncDelayed: true
callerRunsWhenRejected: true
- log:
id: 7beb42b3-b73a-4200-ade6-6b713d126520
message: "Элемент ${header.itemIndex} обработан"
disabled: false
loggingLevel: INFO
- log:
id: aea2889d-ddf9-4c43-9463-f2635b88f179
message: "Файл ${header.CamelFileName} полностью обработан"
disabled: false
loggingLevel: INFO
- Активируйте поток.
Узлы кластера берут файлы из папки incoming и блокируют их на время обработки, далее узел последовательно обрабатывает все токены с указанным именем взятого файла (в примере ObjectEvent или AggregationEvent), заносит индекс обработанного токена в БД. Если на обрабатывавшем файл узле в процессе обработки происходит сбой, то по истечении времени блокировки файла обработку начнет любой другой узел с того индекса, где остановился предыдущий узел.