Приложение на основе мультитенантной схемы с r2dbc

Я работаю над многопользовательским реактивным приложением, использующим Spring-Webflux + Spring-data-r2dbc с драйвером r2dbc для подключения к базе данных Postgresql. Мультитенантная часть основана на схеме: одна схема на каждого арендатора. Таким образом, в зависимости от контекста (например, пользователя, вошедшего в систему) запросы будут попадать в определенную схему базы данных.

Я изо всех сил пытаюсь сделать это в r2dbc. В идеале это было бы так, как Hibernate с MultiTenantConnectionProvider (см. Пример 16.3).

Что я нашел и что сделал на данный момент:

  • Можно использовать AbstractRoutingConnectionFactory, как указано, r2dbc-pool < / а>
  • Я посмотрел на . Интересно то, что на prepareConnection есть вызов setSchema(connection):

    private Mono<Void> setSchema(PostgresqlConnection connection) {
        if (this.configuration.getSchema() == null) {
            return Mono.empty();
        }
    
        return connection.createStatement(String.format("SET SCHEMA '%s'", this.configuration.getSchema()))
            .execute()
            .then();
    }
    

Может быть, мне нужно найти способ переопределить это, чтобы динамически получать схему из контекста, а не из конфигурации?

  • В противном случае я мог бы попытаться указать схему в запросе как префикс таблицы:

        String s = "tenant-1";
        databaseClient.execute("SELECT * FROM \"" + s + "\".\"city\"")
                .as(City.class)
                .fetch()
                .all()
    

Но я больше не могу использовать SpringData или мне нужно переопределять каждый запрос, чтобы передать клиента в качестве параметра.

Любые подсказки / помощь приветствуются :)


person Arnaud42    schedule 09.04.2020    source источник


Ответы (4)


Я тоже столкнулся с этим.

Вот чем я сейчас занимаюсь:

  • Опубликуйте PostgresqlConnectionConfigurationBuilder и PostgresqlConnectionFactory как Bean:

    @Bean
    public PostgresqlConnectionConfiguration.Builder postgresqlConnectionConfiguration() {
        return PostgresqlConnectionConfiguration.builder()
                .host("localhost")
                .port(5432)
                .applicationName("team-toplist-service")
                .database("db")
                .username("user")
                .password("password");
    }
    
    @Bean
    @Override
    public PostgresqlConnectionFactory connectionFactory() {
        return new PostgresqlConnectionFactory(postgresqlConnectionConfiguration()
                .build());
    }
    

Чтобы позже (в моем бизнес-методе) создать новый PostgresqlConnectionFactory, используя внедренный экземпляр PostgresqlConnectionConfigurationBuilder - но теперь с установщиком "схемы" также вызывал построитель (после извлечения информации о клиенте из входящего org.springframework.web.reactive.function.server.ServerRequest, который я передал из моего маршрутного компонента.

Мои схемы db следуют шаблону appname_tenantId, поэтому у нас есть «appName», статически настроенное на, например, «app_name», поэтому я получаю имена схем, такие как «app_name_foo_bar123»

Затем у нас есть идентификатор клиента, который в моем случае будет исходить из заголовка запроса, который гарантированно будет установлен сервером apache, находящимся в восходящем потоке (передавая заголовок X-Tenant-Id для входящих запросов, чтобы не полагаться на URL-адреса для выполнения маршрутизация для конкретного арендатора)

Итак, моя «логика» в настоящее время выглядит примерно так:

public Flux<TopTeam> getTopTeams(ServerRequest request) {

    List<String> tenantHeader = request.headers().header("X-Tenant-Id");
    // resolve relevant schema name on the fly
    String schema = (appName+ "_" + tenantHeader.iterator().next()).replace("-", "_");
    System.out.println("Using schema: " + schema);
    // configure connfactory with schema set on the builder
    PostgresqlConnectionFactory cf = new PostgresqlConnectionFactory(postgresqlConnectionConfiguration.schema(schema).build());
    // init new DatabaseClient with tenant specific connection
    DatabaseClient cli = DatabaseClient.create(cf);


        return cli
                .execute("select * from top_teams ").fetch().all()
                .flatMap(map -> {

                    ...
                    });
                });
    }

Эта логика, конечно, может быть абстрагирована, но не уверен, где ее разместить, возможно, это можно переместить в MethodArgumentResolver, чтобы мы могли просто ввести уже настроенный DatabaseClient


ps: это решает проблему мультиарендности только при использовании DatabaseClient. Я не уверен, как это сделать с R2dbcRepositories

person salgmachine    schedule 12.06.2020
comment
По крайней мере, это СУПЕР ПОЛЕЗНО для ddl, потому что Spring DAta R2dbc не выполняет операции DDL, такие как Spring Data JPA. Другими словами, мы можем использовать эту стратегию для DDL и R2dbcRepositories для DML. Потрясающие! - person PaulDev; 25.09.2020

Я создал пример мультитенантности для r2dbc, но с использованием стратегии для каждой базы данных.

Ознакомьтесь с полными примерами кодов здесь.

В некоторых базах данных концепция схемы и базы данных эквивалентна. Если вы продолжаете использовать стратегию для каждой схемы, добавьте SQL в выбор схемы (пожалуйста, исследуйте базу данных, которую вы используете, и определите правильное предложение для установки схемы) при получении соединения.

person Hantsy    schedule 15.09.2020

Спасибо / на основании ответа @charlie carver, вот как я решил эту проблему:

Контроллер:

    @PostMapping(MAP + PATH_DDL_PROC_DB)  //PATH_DDL_PROC_DB = "/database/{db}/{schema}/{table}"
    public Flux<Object> createDbByDb(
            @PathVariable("db") String db,
            @PathVariable("schema") String schema,
            @PathVariable("table") String table) {
        return ddlProcService.createDbByDb(db,schema,table);

Услуга:

    public Flux<Object> createDbByDb(String db,String schema,String table) {
        return ddl.createDbByDb(db,schema,table);
    }

Репозиторий:

    @Autowired
    PostgresqlConnectionConfiguration.Builder connConfig;

    public Flux<Object> createDbByDb(String db,String schema,String table) {
        return createDb(db).thenMany(
                Mono.from(connFactory(connConfig.database(db)).create())
                    .flatMapMany(
                            connection ->
                                    Flux.from(connection
                                                      .createBatch()
                                                      .add(sqlCreateSchema(db))
                                                      .add(sqlCreateTable(db,table))
                                                      .add(sqlPopulateTable(db,table))
                                                      .execute()
                                             )));
    }

    private Mono<Void> createDb(String db) {

        PostgresqlConnectionFactory
                connectionFactory = connFactory(connConfig);

        DatabaseClient ddl = DatabaseClient.create(connectionFactory);

        return ddl
                .execute(sqlCreateDb(db))
                .then();
    }

Класс подключения:

@Slf4j
@Configuration
@EnableR2dbcRepositories
public class Connection extends AbstractR2dbcConfiguration {

    /*
     **********************************************
     * Spring Data jdbc:
     *      DDL: does support JPA.
     *
     * Spring Data R2DBC
     *      DDL:
     *          -does no support JPA
     *          -To achieve DDL, uses R2dbc.DataBaseClient
     *
     *      DML:
     *          -it uses R2dbcREpositories
     *          -R2dbcRepositories is different than
     *          R2dbc.DataBaseClient
     * ********************************************
     */
    @Bean
    public PostgresqlConnectionConfiguration.Builder connectionConfig() {
        return PostgresqlConnectionConfiguration
                .builder()
                .host("db-r2dbc")
                .port(5432)
                .username("root")
                .password("root");
    }

    @Bean
    public PostgresqlConnectionFactory connectionFactory() {
        return
                new PostgresqlConnectionFactory(
                        connectionConfig().build()
                );
    }
}

Скрипты DDL:

@Getter
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class DDLScripts {

    public static final String SQL_GET_TASK = "select * from tasks";

    public static String sqlCreateDb(String db) {
        String sql = "create database %1$s;";
        String[] sql1OrderedParams = quotify(new String[]{db});
        String finalSql = format(sql,(Object[]) sql1OrderedParams);
        return finalSql;
    }

    public static String sqlCreateSchema(String schema) {
        String sql = "create schema if not exists %1$s;";
        String[] sql1OrderedParams = quotify(new String[]{schema});
        return format(sql,(Object[])  sql1OrderedParams);
    }

    public static String sqlCreateTable(String schema,String table) {

        String sql1 = "create table %1$s.%2$s " +
                "(id serial not null constraint tasks_pk primary key, " +
                "lastname varchar not null); ";
        String[] sql1OrderedParams = quotify(new String[]{schema,table});
        String sql1Final = format(sql1,(Object[])  sql1OrderedParams);

        String sql2 = "alter table %1$s.%2$s owner to root; ";
        String[] sql2OrderedParams = quotify(new String[]{schema,table});
        String sql2Final = format(sql2,(Object[])  sql2OrderedParams);

        return sql1Final + sql2Final;
    }

    public static String sqlPopulateTable(String schema,String table) {

        String sql = "insert into %1$s.%2$s values (1, 'schema-table-%3$s');";
        String[] sql1OrderedParams = quotify(new String[]{schema,table,schema});
        return format(sql,(Object[]) sql1OrderedParams);
    }

    private static String[] quotify(String[] stringArray) {

        String[] returnArray = new String[stringArray.length];

        for (int i = 0; i < stringArray.length; i++) {
            returnArray[i] = "\"" + stringArray[i] + "\"";
        }
        return returnArray;
    }
}
person PaulDev    schedule 29.09.2020

Спасибо за ответы. В конце концов я пришел к такому решению:

Создайте ConnectionFactory по клиенту / схеме:

public class CloudSpringUtilsConnectionFactoryBuilder implements ConnectionFactoryBuilder {

@Override
public ConnectionFactory buildConnectionFactory(String schema) {
    PostgresqlConnectionConfiguration configuration = getPostgresqlConnectionConfigurationBuilder(schema)
            .build();
    return new PostgresqlConnectionFactory(configuration);
}

@Override
public ConnectionFactory buildSimpleConnectionFactory() {
    PostgresqlConnectionConfiguration configuration = getPostgresqlConnectionConfigurationBuilder(null)
            .build();
    return new PostgresqlConnectionFactory(configuration);
}

protected PostgresqlConnectionConfiguration.Builder getPostgresqlConnectionConfigurationBuilder(String schema) {
    return PostgresqlConnectionConfiguration
            .builder()
            .username(dbUser)
            .password(dbPassword)
            .host(dbHost)
            .port(dbPort)
            .database(dbName)
            .schema(schema);
}

Создайте TenantRoutingConnectionFactory, чтобы получить правильный ConnectionFactory в зависимости от клиента. В нашем случае клиент извлекается из субъекта аутентификации (токен конвертируется в UserProfile):

public class TenantRoutingConnectionFactory extends AbstractRoutingConnectionFactory {

private final DatabaseMigrationService databaseMigrationService;
private final ConnectionFactoryBuilder connectionFactoryBuilder;

private final Map<String, ConnectionFactory> targetConnectionFactories = new ConcurrentHashMap<>();

@PostConstruct
private void init() {
    setLenientFallback(false);
    setTargetConnectionFactories(new HashMap<>());
    setDefaultTargetConnectionFactory(connectionFactoryBuilder.buildConnectionFactory());
}

@Override
protected Mono<Object> determineCurrentLookupKey() {
    return ReactiveSecurityContextHolder.getContext()
            .map(this::getTenantFromContext)
            .flatMap(tenant -> databaseMigrationService.migrateTenantIfNeeded(tenant)
                    .thenReturn(tenant));
}

private String getTenantFromContext(SecurityContext securityContext) {
    String tenant = null;
    Object principal = securityContext.getAuthentication().getPrincipal();
    if (principal instanceof UserProfile) {
        UserProfile userProfile = (UserProfile) principal;
        tenant = userProfile.getTenant();
    }
    ...
    log.debug("Tenant resolved: " + tenant);
    return tenant;
}

@Override
protected Mono<ConnectionFactory> determineTargetConnectionFactory() {
    return determineCurrentLookupKey().map(k -> {
        String key = (String) k;
        if (!targetConnectionFactories.containsKey(key)) {
            targetConnectionFactories.put(key, connectionFactoryBuilder.buildConnectionFactory(key));
        }
        return targetConnectionFactories.get(key);
    });
}

Обратите внимание, что мы используем Flyway в DatabaseMigrationService для создания и переноса схемы для каждого полученного клиента.

person Arnaud42    schedule 01.10.2020