Драйвер Cassandra datastax ResultSet совместно используется несколькими потоками для быстрого чтения

У меня огромные таблицы в кассандре, более 2 миллиардов строк и все больше. В строках есть поле даты, и оно следует шаблону сегмента даты, чтобы ограничить каждую строку.

Даже тогда у меня есть более миллиона записей на конкретную дату.

Я хочу читать и обрабатывать строки за каждый день как можно быстрее. Что я делаю, так это получаю экземпляр com.datastax.driver.core.ResultSet и получаю от него итератор и делюсь этим итератором между несколькими потоками.

Итак, по сути, я хочу увеличить пропускную способность чтения. Это правильный путь? Если нет, пожалуйста, предложите лучший способ.


person RRM    schedule 06.05.2015    source источник


Ответы (1)


К сожалению, вы не можете сделать это как есть. Причина в том, что ResultSet предоставляет внутреннее состояние разбиения по страницам, которое используется для извлечения строк по одной странице за раз.

Однако у вас есть варианты. Поскольку я предполагаю, что вы выполняете запросы диапазона (запросы по нескольким разделам), вы можете использовать стратегию, при которой вы отправляете несколько запросов по диапазонам токенов одновременно, используя директиву токена. Хороший пример описан в разделе Постраничный просмотр неупорядоченных результатов секционирования.

java-driver 2.0.10 и 2.1.5 предоставляют механизм для получения диапазонов токенов с хостов и разделив их. Пример того, как это сделать, есть в интеграционных тестах java-драйвера в TokenRangeIntegrationTest.java#should_expose_token_ranges():

    PreparedStatement rangeStmt = session.prepare("SELECT i FROM foo WHERE token(i) > ? and token(i) <= ?");

    TokenRange foundRange = null;
    for (TokenRange range : metadata.getTokenRanges()) {
        List<Row> rows = rangeQuery(rangeStmt, range);
        for (Row row : rows) {
            if (row.getInt("i") == testKey) {
                // We should find our test key exactly once
                assertThat(foundRange)
                    .describedAs("found the same key in two ranges: " + foundRange + " and " + range)
                    .isNull();
                foundRange = range;
                // That range should be managed by the replica
                assertThat(metadata.getReplicas("test", range)).contains(replica);
            }
        }
    }
    assertThat(foundRange).isNotNull();
}
...
private List<Row> rangeQuery(PreparedStatement rangeStmt, TokenRange range) {
    List<Row> rows = Lists.newArrayList();
    for (TokenRange subRange : range.unwrap()) {
        Statement statement = rangeStmt.bind(subRange.getStart(), subRange.getEnd());
        rows.addAll(session.execute(statement).all());
    }
    return rows;
}

В основном вы можете генерировать свои утверждения и отправлять их асинхронно, приведенный выше пример просто повторяет утверждения по одному за раз.

Другой вариант — использовать spark-cassandra-connector, который, по сути, делает это незаметно и очень эффективно. путь. Я нахожу его очень простым в использовании, и вам даже не нужно настраивать искровой кластер, чтобы использовать его. Подробнее об использовании Java API см. в этом документе.

person Andy Tolbert    schedule 06.05.2015