Я реализую реактивный проект с Spring boot 2.3.1, Webflux, Spring Data с реактивным драйвером mongodb и Amazon SDk 2.14.6.
У меня есть CRUD, который сохраняет объект в MongoDB и должен загрузить файл на S3. Я использую реактивный метод SDK s3AsyncClient.putObject
и столкнулся с некоторыми проблемами. CompletableFuture создает следующее исключение:
java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.ApiCallTimeoutException: Client execution did not complete before the specified timeout configuration: 60000 millis
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[na:na]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
reactor.core.publisher.Mono.map(Mono.java:3054)
br.com.wareline.waredrive.service.S3Service.uploadFile(S3Service.java:94)
Файл, который я пытаюсь загрузить, имеет размер около 34 КБ. Это простой текстовый файл.
Метод загрузки находится в моем классе S3Service.java
, который автоматически подключается к DocumentoService.java.
@Component
public class S3Service {
@Autowired
private final ConfiguracaoService configuracaoService;
public Mono<PutObjectResponse> uploadFile(final HttpHeaders headers, final Flux<ByteBuffer> body, final String fileKey, final String cliente) {
return configuracaoService.findByClienteId(cliente)
.switchIfEmpty(Mono.error(new ResponseStatusException(HttpStatus.NOT_FOUND, String.format("Configuração com id %s não encontrada", cliente))))
.map(configuracao -> uploadFileToS3(headers, body, fileKey, configuracao))
.doOnSuccess(response -> {
checkResult(response);
});
}
private PutObjectResponse uploadFileToS3(final HttpHeaders headers, final Flux<ByteBuffer> body, final String fileKey, final Configuracao configuracao) {
final long length = headers.getContentLength();
if (length < 0) {
throw new UploadFailedException(HttpStatus.BAD_REQUEST.value(), Optional.of("required header missing: Content-Length"));
}
final Map<String, String> metadata = new HashMap<>();
final MediaType mediaType = headers.getContentType() != null ? headers.getContentType() : MediaType.APPLICATION_OCTET_STREAM;
final S3AsyncClient s3AsyncClient = getS3AsyncClient(configuracao);
return s3AsyncClient.putObject(
PutObjectRequest.builder()
.bucket(configuracao.getBucket())
.contentLength(length)
.key(fileKey)
.contentType(mediaType)
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body))
.whenComplete((resp, err) -> s3AsyncClient.close())
.join();
}
public S3AsyncClient getS3AsyncClient(final Configuracao s3Props) {
final SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
.readTimeout(Duration.ofMinutes(1))
.writeTimeout(Duration.ofMinutes(1))
.connectionTimeout(Duration.ofMinutes(1))
.maxConcurrency(64)
.build();
final S3Configuration serviceConfiguration = S3Configuration.builder().checksumValidationEnabled(false).chunkedEncodingEnabled(true).build();
return S3AsyncClient.builder()
.httpClient(httpClient)
.region(Region.of(s3Props.getRegion()))
.credentialsProvider(() -> AwsBasicCredentials.create(s3Props.getAccessKey(), s3Props.getSecretKey()))
.serviceConfiguration(serviceConfiguration)
.overrideConfiguration(builder -> builder.apiCallTimeout(Duration.ofMinutes(1)).apiCallAttemptTimeout(Duration.ofMinutes(1)))
.build();
}
Я основывал свою реализацию на документации Amazon SDK и примерах кода по адресу https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/s3/src/main/.java/com/example/s3/S3AsyncOps.java
Я не могу понять, в чем причина проблемы тайм-аута асинхронного клиента. Странно то, что когда я использую тот же S3AsyncClient для загрузки файлов из корзины, он работает. Я безуспешно пытался увеличить время ожидания в S3AsyncClient примерно до 5 минут. Я не знаю, что я делаю неправильно.
Mono.fromFuture
return и вызвать метод uploadFileToS3 из оператора flatMap. - person Martin Tarjányi   schedule 02.09.2020Mono.fromFuture
, как вы предложили, но получил ту же ошибку. - person Ciro Anacleto   schedule 02.09.2020Flux<ByteBuffer> body
, действительно ли он используется aws sdk или просто висит там. Также убедитесь, что вы не подписаны на один и тот же поток до/после загрузки, это может вызвать аналогичную проблему. - person Martin Tarjányi   schedule 02.09.2020