Spring Webflux и Amazon SDK 2.x: тайм-аут S3AsyncClient

Я реализую реактивный проект с Spring boot 2.3.1, Webflux, Spring Data с реактивным драйвером mongodb и Amazon SDk 2.14.6.

У меня есть CRUD, который сохраняет объект в MongoDB и должен загрузить файл на S3. Я использую реактивный метод SDK s3AsyncClient.putObject и столкнулся с некоторыми проблемами. CompletableFuture создает следующее исключение:

java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.ApiCallTimeoutException: Client execution did not complete before the specified timeout configuration: 60000 millis
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[na:na]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
    reactor.core.publisher.Mono.map(Mono.java:3054)
    br.com.wareline.waredrive.service.S3Service.uploadFile(S3Service.java:94)

Файл, который я пытаюсь загрузить, имеет размер около 34 КБ. Это простой текстовый файл.

Метод загрузки находится в моем классе S3Service.java, который автоматически подключается к DocumentoService.java.

@Component
public class S3Service {

    @Autowired
    private final ConfiguracaoService configuracaoService;

    public Mono<PutObjectResponse> uploadFile(final HttpHeaders headers, final Flux<ByteBuffer> body, final String fileKey, final String cliente) {
        return configuracaoService.findByClienteId(cliente)
                .switchIfEmpty(Mono.error(new ResponseStatusException(HttpStatus.NOT_FOUND, String.format("Configuração com id %s não encontrada", cliente))))
                .map(configuracao -> uploadFileToS3(headers, body, fileKey, configuracao))
                .doOnSuccess(response -> {
                    checkResult(response);
                });
    }

    private PutObjectResponse uploadFileToS3(final HttpHeaders headers, final Flux<ByteBuffer> body, final String fileKey, final Configuracao configuracao) {

        final long length = headers.getContentLength();
        if (length < 0) {
            throw new UploadFailedException(HttpStatus.BAD_REQUEST.value(), Optional.of("required header missing: Content-Length"));
        }
        final Map<String, String> metadata = new HashMap<>();
        final MediaType mediaType = headers.getContentType() != null ? headers.getContentType() : MediaType.APPLICATION_OCTET_STREAM;

        final S3AsyncClient s3AsyncClient = getS3AsyncClient(configuracao);

        return s3AsyncClient.putObject(
                PutObjectRequest.builder()
                        .bucket(configuracao.getBucket())
                        .contentLength(length)
                        .key(fileKey)
                        .contentType(mediaType)
                        .metadata(metadata)
                        .build(),
                AsyncRequestBody.fromPublisher(body))
                .whenComplete((resp, err) -> s3AsyncClient.close())
                .join();
    }

    public S3AsyncClient getS3AsyncClient(final Configuracao s3Props) {

        final SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
            .readTimeout(Duration.ofMinutes(1))
            .writeTimeout(Duration.ofMinutes(1))
            .connectionTimeout(Duration.ofMinutes(1))
            .maxConcurrency(64)
            .build();

        final S3Configuration serviceConfiguration = S3Configuration.builder().checksumValidationEnabled(false).chunkedEncodingEnabled(true).build();

        return S3AsyncClient.builder()
            .httpClient(httpClient)
            .region(Region.of(s3Props.getRegion()))
            .credentialsProvider(() -> AwsBasicCredentials.create(s3Props.getAccessKey(), s3Props.getSecretKey()))
            .serviceConfiguration(serviceConfiguration)
            .overrideConfiguration(builder -> builder.apiCallTimeout(Duration.ofMinutes(1)).apiCallAttemptTimeout(Duration.ofMinutes(1)))
            .build();

    }

Я основывал свою реализацию на документации Amazon SDK и примерах кода по адресу https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/s3/src/main/.java/com/example/s3/S3AsyncOps.java

Я не могу понять, в чем причина проблемы тайм-аута асинхронного клиента. Странно то, что когда я использую тот же S3AsyncClient для загрузки файлов из корзины, он работает. Я безуспешно пытался увеличить время ожидания в S3AsyncClient примерно до 5 минут. Я не знаю, что я делаю неправильно.


person Ciro Anacleto    schedule 02.09.2020    source источник
comment
Не уверен, что это проблема, но вы не используете AWS sdk реактивно. Когда вы вызываете join, вы эффективно блокируете поток. Вместо этого вы должны обернуть CompletableFuture с помощью Mono.fromFuturereturn и вызвать метод uploadFileToS3 из оператора flatMap.   -  person Martin Tarjányi    schedule 02.09.2020
comment
Я уже пытался обернуть completableFuture в Mono.fromFuture, как вы предложили, но получил ту же ошибку.   -  person Ciro Anacleto    schedule 02.09.2020
comment
Затем в качестве следующего шага я бы проверил Flux<ByteBuffer> body, действительно ли он используется aws sdk или просто висит там. Также убедитесь, что вы не подписаны на один и тот же поток до/после загрузки, это может вызвать аналогичную проблему.   -  person Martin Tarjányi    schedule 02.09.2020


Ответы (1)


Я нашел ошибку. Когда я определяю contentLength в PutObjectRequest.builder().contentLength(length), я использую headers.getContentLength(), который является размером всего запроса. В моем запросе другая информация передается вместе, в результате чего длина содержимого превышает реальную длину файла.

Я нашел это в документации Amazon:

Количество байтов, заданное в заголовке Content-Length, превышает фактический размер файла.

Когда вы отправляете HTTP-запрос в Amazon S3, Amazon S3 ожидает получить объем данных, указанный в заголовке Content-Length. Если ожидаемый объем данных не получен Amazon S3, а соединение простаивает в течение 20 секунд или дольше, соединение закрывается. Убедитесь, что фактический размер файла, который вы отправляете в Amazon S3, соответствует размеру файла, указанному в заголовке Content-Length.

https://aws.amazon.com/pt/premiumsupport/knowledge-center/s3-socket-connection-timeout-error/

Ошибка тайм-аута произошла из-за того, что S3 ожидает, пока длина отправленного содержимого не достигнет размера, указанного в клиенте, файл завершает передачу до достижения указанной длины содержимого. Затем соединение остается бездействующим, и S3 закрывает сокет.

Я изменил длину содержимого на реальный размер файла, и загрузка прошла успешно.

person Ciro Anacleto    schedule 04.09.2020