Перейти к основному содержимому
Начало работы
Руководство администратора
How To статьи
Установка и настройка
Компоненты
Руководство пользователя

Как передавать большие файлы с помощью блока SFTP

Платформа Bercut ESB позволяет передавать файлы большого размера – до 100 мБ с использованием блока SFTP и других интеграционных блоков. Файлы можно передавать целиком или по частям.

Настройка обработки целых файлов в кластере

Для реализации отказоустойчивой кластерной обработки файлов несколькими узлами создан пользовательский идемпотентный репозиторий.

Репозиторий предназначен для выполнения следующих функций:

  • Блокировать файл на время обработки его узлом

  • Позволять другим узлам переходить к следующему файлу при обнаружении заблокированного.

Если на узле произойдет сбой, то через заданное время файл разблокируется и может быть обработан любым другим узлом, в том числе и тем, на котором произошел сбой, если к моменту восстановления файл не был занят.

Таблица репозитория содержит:

  • Композитный ключ: имя узла + имя файла

  • Индекс по имени файла

Чтобы реализовать обработку больших целых файлов, откройте редактор потоков и настройте поток с конфигурацией, как показано на рисунке ниже. Блоки Logger и Delay являются вспомогательными и не несут на себе функциональную нагрузку по передаче файлов.

Preview

  1. Добавьте в качестве инициирующего блока и сконфигурируйте блок SFTP, как показано на рисунках ниже.

Красным цветом отмечены критически важные настройки, которые должны быть определены. Для строковых параметров приведены примеры заполнения.

  • Host

  • Port

  • Directory name

Preview

  • Idempotent = True

  • Idempotent Eager = True

  • Idempotent Repository = #jdbcClusterIdempotentRepository

  • Max Messages per Poll

  • Move – временная папка, куда перемещаются файлы на время блокировки

Preview

  • Read Lock Remove On Commit = True

Preview

  • Download = True

Preview

  • Stepwise = False

Preview

  • Password

  • Username

  • Use User Known Hosts File = False

Preview

  1. Нажмите Применить.

  2. При необходимости добавьте блоки логгирования Logger и блок временной задержки между обработкой файлов Delay. В каждом блоке сконфигурируйте настройки и примените изменения, нажав Применить.

Примеры конфигурации дополнительных блоков показаны на рисунках ниже.

Первый блок Logger:

Preview

Блок Delay:

Preview

Второй блок Logger:

Preview

  1. Добавьте второй блок SFTP и сконфигурируйте его параметры, как показано на рисунках ниже.

Красным цветом отмечены критически важные настройки, которые должны быть определены. Host

  • Host – такой же, как в первом блоке SFTP

  • Port – такой же, как в первом блоке SFTP

  • Directory name

Preview

  • Idempotent = True

  • Idempotent Eager = True

Preview

  • Password

  • Username

  • Use User Known Hosts File = False

Preview

  1. Нажмите Применить.

  2. Нажмите Сохранить. Версия потока сохранена.

  3. Чтобы проверить код потока, перейдите режим отображения кода, нажав на кнопку Код в правом верхнем углу рабочей области.

Preview

В рабочей области отобразится код потока.

Preview

  1. Проверьте критические параметры в коде:
configurationItems:
- route:
id: 54e3793c-184e-48d0-9d5d-710f892b039c
from:
id: 54e3793c-184e-48d0-9d5d-710f892b039c
uri: sftp
parameters:
host: red-1.dev.local
move: ".processing/${file:name}.${hostname}"
noop: false
port: "2222"
delay: "5000"
binary: false
delete: false
greedy: false
preSort: false
shuffle: false
timeout: "30000"
useList: true
download: true
maxDepth: 2147483647
password: pass
readLock: none
stepwise: false
timeUnit: MILLISECONDS
username: user
recursive: false
scheduler: none
separator: UNIX
soTimeout: "300000"
autoCreate: false
disconnect: false
idempotent: true
passiveMode: false
repeatCount: 0
initialDelay: 1000
directoryName: incoming
useFixedDelay: true
connectTimeout: "10000"
readLockMinAge: 0
reconnectDelay: "1000"
startScheduler: true
streamDownload: false
fastExistsCheck: false
idempotentEager: true
readLockTimeout: 10000
runLoggingLevel: TRACE
jschLoggingLevel: WARN
readLockMinLength: 1
bridgeErrorHandler: false
maxMessagesPerPoll: "1"
readLockMarkerFile: false
serverAliveCountMax: 1
existDirCheckUsingLs: true
idempotentRepository: "#jdbcClusterIdempotentRepository"
readLockLoggingLevel: DEBUG
readLockCheckInterval: 1000
strictHostKeyChecking: "no"
useUserKnownHostsFile: false
antFilterCaseSensitive: true
readLockRemoveOnCommit: true
eagerMaxMessagesPerPoll: true
readLockRemoveOnRollback: false
sendEmptyMessageWhenIdle: false
serverMessageLoggingLevel: DEBUG
readLockDeleteOrphanLockFiles: true
throwExceptionOnConnectFailed: false
readLockIdempotentReleaseAsync: false
ignoreFileNotFoundOrPermissionError: false
steps:
- log:
id: a1c06e14-0784-4757-927e-627bc7ffb0fd
message: "Файл ${header.CamelFileName} захвачен нодой ${hostname}"
disabled: "false"
loggingLevel: INFO
- delay:
id: ccb5dc07-ab20-4f2d-b343-e6f6c8588e56
disabled: false
constant: null
trim: "true"
expression: "25000"
resultType: java.lang.Long
asyncDelayed: true
callerRunsWhenRejected: true
- log:
id: 0d4ec145-cc19-4d1e-b1f1-8075ceea2c68
message: "Файл ${header.CamelFileName} обработан"
disabled: "false"
loggingLevel: INFO
- to:
id: af5a4ad0-1474-4cee-823c-4617965d591c
uri: sftp
parameters:
host: red-1.dev.local
noop: false
port: "2222"
delay: 500
binary: false
delete: false
greedy: false
flatten: false
preSort: false
shuffle: false
timeout: "30000"
useList: true
download: false
maxDepth: 2147483647
password: pass
readLock: none
sendNoop: true
stepwise: false
timeUnit: MILLISECONDS
username: user
fileExist: Override
recursive: false
scheduler: none
separator: UNIX
soTimeout: "300000"
autoCreate: true
disconnect: false
idempotent: "false"
passiveMode: false
repeatCount: 0
initialDelay: 1000
allowNullBody: false
directoryName: processed
useFixedDelay: true
connectTimeout: "10000"
readLockMinAge: 0
reconnectDelay: "1000"
startScheduler: true
streamDownload: false
fastExistsCheck: true
idempotentEager: true
readLockTimeout: 10000
runLoggingLevel: TRACE
jschLoggingLevel: WARN
keepLastModified: false
lazyStartProducer: false
readLockMinLength: 1
bridgeErrorHandler: false
readLockMarkerFile: false
serverAliveCountMax: 1
existDirCheckUsingLs: true
readLockLoggingLevel: DEBUG
eagerDeleteTargetFile: true
jailStartingDirectory: true
readLockCheckInterval: 1000
strictHostKeyChecking: "no"
useUserKnownHostsFile: false
antFilterCaseSensitive: true
readLockRemoveOnCommit: false
eagerMaxMessagesPerPoll: true
readLockRemoveOnRollback: false
sendEmptyMessageWhenIdle: false
disconnectOnBatchComplete: false
serverMessageLoggingLevel: DEBUG
readLockDeleteOrphanLockFiles: true
throwExceptionOnConnectFailed: false
readLockIdempotentReleaseAsync: false
ignoreFileNotFoundOrPermissionError: false
  1. Активируйте поток.

Файлы перемещаются из заданной папки incoming в папку processed. Узлы кластера блокируют файлы на время обработки. Если на узле происходит сбой, через заданное время файл разблокируется и может быть обработан любым другим узлом . Блокировка файлов является динамической и зависит от времени обработки файла.

После обработки запись из таблицы блокировки файлов удаляется и блокировка снимается.

Настройка обработки структурированных файлов по частям в кластере

Для обработки структурированных файлов по частям реализован репозиторий с хранением состояния элементов. Репозиторий организован следующим образом:

  • Таблица содержит композитный ключ (имя файла + ID элемента)

  • Добавлен индекс по элементу

  • Файл загружается в RAM целиком для последовательной генерации ID элементов.

Ниже приведены шаги по настройке обработки структурированного файла. Для этого следует сконфигурировать поток, который находит все токены ObjectEvent или AggregationEvent в загруженном xml-файле. Сам файл никуда не перемещается.

Развернутая схема потока приведена на рисунке ниже.

Preview

Блоки Logger и Delay являются вспомогательными для процесса и добавляются по необходимости. В приведённом ниже примере они добавлены в сценарий.

Схема потока со свернутыми блоками, входящими в блок Split приведена на рисунке ниже.

Поток включает следующие блоки:

SFTP 🡪 Logger 🡪 Convert Body To 🡪 Logger 🡪 Split 🡪 Logger

Preview

Схема блоков, входящих в блок Split приведена на рисунке ниже.

Блок Split включает следующие блоки:

Set Header 🡪 Idempotent Consumer (Logger 🡪 Delay 🡪 Logger)

Preview

Чтобы подготовить обработку больших файлов по частям, откройте редактор потоков и настройте поток с конфигурацией, как описано ниже.

  1. Добавьте на рабочую область блок SFTP в качестве инициирующего блока. Сконфигурируйте его параметры так же, как в первом примере – для обработки целых файлов. Все критические параметры заполняются так же, за исключением параметра Move – его значение заполнять не надо.

  2. Добавьте блоки Logger и блок Delay добавляем и заполняем при необходимости (ниже приведен код роута с логгерами и delay). 

Добавьте и сконфигурируйте критически важные блоки.

  1. Добавьте после SFTP и первого логгера блок Convert Body To.

Установите значение параметра Type: java.Lang.String

Preview

  1. Добавьте блок Split. Установите следующие параметры:

Language: xpath

Expression: //ObjectEvent | //AggregationEvent

Preview

  1. Добавьте блок Set Header первым в блок Split. Установите следующие параметры:

Name: itemIndex

Language: simple

Expression: ${exchangeProperty.CamelSplitIndex}

Preview

  1. Добавьте блок Idempotent Consumer после блока Set Header. Установите следующие параметры:

Language: simple

Expression: ${header.CamelFileName}-${header.itemIndex}

Idempotent Repository: JdbcXmlItemIdempotentRepository

Preview

  1. Добавьте вспомогательные блоки в блок Idempotent Consumer.

  2. Сохраните версию потока.

  3. Проверьте критические параметры в коде:

configurationItems:
- route:
id: f2d5bd2a-54d6-4232-ae5b-f44fc9aebf29
from:
id: from-sftp-endpoint
uri: sftp://red-1.dev.local:2222/incoming
parameters:
username: user
password: pass
move: ""
noop: false
port: "2222"
delay: "5000"
binary: false
delete: false
greedy: false
preSort: false
shuffle: false
timeout: "30000"
useList: true
download: true
maxDepth: 2147483647
readLock: none
stepwise: false
timeUnit: MILLISECONDS
recursive: false
scheduler: none
separator: UNIX
soTimeout: "300000"
autoCreate: true
disconnect: false
idempotent: true
passiveMode: false
repeatCount: 0
initialDelay: 1000
useFixedDelay: true
connectTimeout: "10000"
readLockMinAge: 0
reconnectDelay: "1000"
startScheduler: true
streamDownload: false
fastExistsCheck: false
idempotentEager: true
readLockTimeout: 10000
runLoggingLevel: TRACE
jschLoggingLevel: WARN
readLockMinLength: 1
bridgeErrorHandler: false
maxMessagesPerPoll: "1"
readLockMarkerFile: true
serverAliveCountMax: 1
existDirCheckUsingLs: true
idempotentRepository: "#jdbcClusterIdempotentRepository"
readLockLoggingLevel: DEBUG
readLockCheckInterval: 1000
strictHostKeyChecking: "no"
useUserKnownHostsFile: false
antFilterCaseSensitive: true
readLockRemoveOnCommit: true
eagerMaxMessagesPerPoll: true
readLockRemoveOnRollback: true
sendEmptyMessageWhenIdle: false
serverMessageLoggingLevel: DEBUG
readLockDeleteOrphanLockFiles: true
throwExceptionOnConnectFailed: false
readLockIdempotentReleaseAsync: false
ignoreFileNotFoundOrPermissionError: false
steps:
- log:
id: 5c0c4e7a-26a4-43e3-861d-a835065c8fed
message: "Начата обработка файла ${header.CamelFileName}"
disabled: false
loggingLevel: INFO
- convertBodyTo:
id: a3b64981-0087-4551-85f0-2f890d4efd95
type: java.lang.String
disabled: false
mandatory: true
- log:
id: cf6ccf74-1078-4718-ad56-8d109c8d45a0
message: "Размер файла: ${body.length()} символов"
disabled: false
loggingLevel: INFO
- split:
id: ef42344f-23e0-4c7e-8e78-277974aea1ce
disabled: false
delimiter: ","
streaming: false
expression:
xpath: "//ObjectEvent | //AggregationEvent"
synchronous: false
shareUnitOfWork: false
stopOnException: false
parallelProcessing: false
aggregationStrategyMethodAllowNull: false
steps:
- setHeader:
id: 651deecf-78bc-43f4-a1f7-6bc5e72b75bf
name: itemIndex
disabled: false
expression:
simple:
trim: true
expression: "${exchangeProperty.CamelSplitIndex}"
- idempotentConsumer:
id: 59fb19dc-293e-4a6e-93ed-17ea84d19883
eager: true
disabled: false
expression:
simple:
trim: true
expression: "${header.CamelFileName}-${header.itemIndex}"
skipDuplicate: true
completionEager: false
removeOnFailure: true
idempotentRepository: "#jdbcXmlItemIdempotentRepository"
steps:
- log:
id: dc13c07c-53b0-4d48-854c-c6da5c918b81
message: "Начало обработки элемента ${header.itemIndex}"
disabled: false
loggingLevel: INFO
- delay:
id: dc0281ac-271c-4012-a370-f9eb7937fa80
disabled: false
expression:
constant:
trim: true
expression: "500"
resultType: java.lang.Long
asyncDelayed: true
callerRunsWhenRejected: true
- log:
id: 7beb42b3-b73a-4200-ade6-6b713d126520
message: "Элемент ${header.itemIndex} обработан"
disabled: false
loggingLevel: INFO
- log:
id: aea2889d-ddf9-4c43-9463-f2635b88f179
message: "Файл ${header.CamelFileName} полностью обработан"
disabled: false
loggingLevel: INFO
  1. Активируйте поток.

Узлы кластера берут файлы из папки incoming и блокируют их на время обработки, далее узел последовательно обрабатывает все токены с указанным именем взятого файла (в примере ObjectEvent или AggregationEvent), заносит индекс обработанного токена в БД. Если на обрабатывавшем файл узле в процессе обработки происходит сбой, то по истечении времени блокировки файла обработку начнет любой другой узел с того индекса, где остановился предыдущий узел.