реактивные микропрофильные клиентские блоки REST на Quarkus

На Quarkus я пытаюсь создать реактивный микропрофильный REST-клиент с динамическим baseUrl, но пока во всех вариантах реализации вызовы REST блокируются после подписки. Что интересно, безреактивная реализация работает как шарм. Посмотрим код ...

Клиентский интерфейс REST:

package ...;

import io.smallrye.mutiny.Uni;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
import java.io.InputStream;
import java.util.concurrent.CompletionStage;

import static javax.ws.rs.core.MediaType.APPLICATION_OCTET_STREAM;

@Path("")
public interface RetrievalRestApi {

  @GET
  @Produces(APPLICATION_OCTET_STREAM)
  Uni<InputStream> retrieve();

  @GET
  @Produces(APPLICATION_OCTET_STREAM)
  InputStream retrieve2(); // non-reactive, the only one that works...

  @GET
  @Produces(APPLICATION_OCTET_STREAM)
  Uni<Response> retrieve3();

  @GET
  @Produces(APPLICATION_OCTET_STREAM)
  CompletionStage<InputStream> retrieve4();
}

Кваркус-тест:

package ...;

import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.mutiny.Uni;
import org.apache.commons.io.IOUtils;
import org.eclipse.microprofile.rest.client.RestClientBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import javax.inject.Inject;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.function.Consumer;

import static org.junit.jupiter.api.Assertions.assertNotNull;

@QuarkusTest
class RetrievalRestApiTest {
  private URL restUrl;

  @BeforeEach
  void setUp() throws MalformedURLException {
    restUrl = new URL("https://some-valid-url");
  }

  @Test
  void testRetrieve() {
    Uni<InputStream> uni = RestClientBuilder.newBuilder()
                                            .baseUrl(restUrl)
                                            .build(RetrievalRestApi.class)
                                            .retrieve()
                                            .onFailure().invoke((Consumer<Throwable>) System.out::println);
    InputStream inputStream = uni.subscribe()
                                 .withSubscriber(UniAssertSubscriber.create())
                                 .await()
                                 .assertCompleted()
                                 .getItem();
    assertNotNull(inputStream);
  }

  @Test
  void testRetrieve2() throws IOException {
    InputStream inputStream = RestClientBuilder.newBuilder()
                                               .baseUrl(restUrl)
                                               .build(RetrievalRestApi.class)
                                               .retrieve2();
    assertNotNull(inputStream);
    String content = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    assertNotNull(content);
  }

  @Test
  void testRetrieve3() {
    Uni<Response> uni = RestClientBuilder.newBuilder()
                                         .baseUrl(restUrl)
                                         .build(RetrievalRestApi.class)
                                         .retrieve3()
                                         .onFailure().invoke((Consumer<Throwable>) System.out::println);
    Response response = uni.subscribe()
                                 .withSubscriber(UniAssertSubscriber.create())
                                 .await()
                                 .assertCompleted()
                                 .getItem();
    assertNotNull(response);
  }

  @Test
  void testRetrieve4() {
    Uni<InputStream> uni = Uni.createFrom().completionStage(RestClientBuilder.newBuilder()
                                         .baseUrl(restUrl)
                                         .build(RetrievalRestApi.class)
                                         .retrieve4())
                                         .onFailure().invoke((Consumer<Throwable>) System.out::println);
    InputStream inputStream = uni.subscribe()
                                 .withSubscriber(UniAssertSubscriber.create())
                                 .await()
                                 .assertCompleted()
                                 .getItem();
    assertNotNull(inputStream);
  }
}

При выполнении 4 тестов успешно выполняется только testRetrieve2, использующий нереактивный API. Все остальные зависают в AssertSubscriber.await ().

Любые идеи?


person Pat    schedule 05.02.2021    source источник
comment
Проблема, похоже, связана с трассировкой с помощью Quarkus Jaeger. Как только я отключаю трассировку с помощью quarkus.jaeger.enabled=false, больше нет блокировки. Возможно, это связано с github.com/quarkusio/quarkus/issues/13440, потому что я увидеть почти идентичную трассировку стека (разница, вероятно, из-за другой версии Quarkus).   -  person Pat    schedule 08.02.2021


Ответы (1)