Кассандра + Гектор, принудительное сжатие в тесте, чтобы убедиться, что пустые строки удаляются.

Мы хотим проверить, что если столбец имеет свойство TTL (time-to-live), он в конечном итоге будет полностью удален из cassandra вместе с пустой строкой, которая его содержала.

Как я понял, алгоритм проверки этого поведения таков

  • при сохранении объекта установить TTL для столбца
  • подождите, когда истечет время TTL, убедитесь, что возвращаемое значение равно нулю
  • подождите, пока пройдет период GC_GRACE_SECONDS
  • проверьте, что строка также удаляется

И я не смог проверить последний пункт.

Как я обнаружил (например, здесь или здесь и в других местах), мне нужно выполнить уплотнение. Были подняты подобные вопросы (например, Гектор (Кассандра) Удалить аномалию), но я не не нашел ничего, что помогло бы, да и гугление не сильно помогло.

Итак, вопрос в том, как я могу принудительно сжать мой интеграционный тест (используя hector), чтобы убедиться, что он ведет себя так, как ожидалось? Или есть другие способы сделать это?

P.S. Усечение семейства столбцов недопустимо.


Вот подробности.

Мои тесты:

private static final String KEYSPACE = "KEYSPACE";
private static final String COLUMN_FAMILY = "COLUMN_FAMILY";

private static final int GC_CRACE_SECONDS = 5;

// sut
private CassandraService cassandraService;

// dependencies
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr", 
    "localhost:9160");

private Keyspace keyspace;

@BeforeClass
public static void setupBeforeClass() {
    EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon();
}

@Before
public void setUp() throws Exception {
    keyspace = createKeyspace(KEYSPACE, cluster, 
        new QuorumAllConsistencyLevelPolicy());
    cassandraService = new CassandraService(cluster, KEYSPACE, 
        COLUMN_FAMILY, GC_CRACE_SECONDS);
}

@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
    Object obj = "OBJECT";
    String rowKey = "key";
    String columnName = "columnName";
    logger.info("before persisting rows count is {}" + countRows());

    cassandraService.persistObjectWithTtl(rowKey, columnName, obj, 5);

    logger.info("after persisting rows count is {}" + countRows());

    Object value = retrieve(rowKey, columnName);
    assertNotNull(value);

    logger.info("before TTL passes rows count is {}" + countRows());

    TimeUnit.SECONDS.sleep(6);

    Object nullValue = retrieve(rowKey, columnName);
    assertNull(nullValue);

    logger.info("after TTL passes rows count is {}" + countRows());

    TimeUnit.SECONDS.sleep(10);

    logger.info("wait 10 more seconds... rows count is {}" + countRows());
    System.out.println("================================" + countRows());

    TimeUnit.SECONDS.sleep(120);

    int countRows = countRows();
    logger.info("wait 2 more minutes... rows count is {}" + countRows);
    assertEquals(0, countRows);
}

Код для сохранения:

public void persistObjectWithTtl(Object rowKey, Object columnName, 
        Object obj, int ttl) {
    LOGGER.debug("Persist {} / {}", rowKey, columnName);
    HColumn<Object, Object> column = createColumn(columnName, obj, 
            SERIALIZER, SERIALIZER);
    column.setTtl(ttl);
    executeInsertion(rowKey, column);
}

private void executeInsertion(Object rowKey, HColumn<Object, Object> column) {
    Mutator<Object> mutator = createMutator(keyspace, SERIALIZER);
    mutator.addInsertion(rowKey, this.columnFamilyName, column);
    mutator.execute();
}

Настройка GcGraceSeconds для семейства столбцов:

private void addColumnFamily(String keySpaceName, String columnFamilyName, 
            int gcGraceSeconds) {
    ColumnFamilyDefinition columnFamilyDefinition = 
        createColumnFamilyDefinition(keySpaceName, columnFamilyName);

    ThriftCfDef columnFamilyWithGCGraceSeconds = 
        new ThriftCfDef(columnFamilyDefinition);
    columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds);

    cluster.addColumnFamily(columnFamilyWithGCGraceSeconds);
}

И код для подсчета строк, найденный на SO:

public int countRows() {
    int rowCount = 100;

    ObjectSerializer serializer = ObjectSerializer.get();
    RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery =
            HFactory.createRangeSlicesQuery(keyspace, serializer, 
                serializer, serializer)
                    .setColumnFamily(COLUMN_FAMILY)
                    .setRange(null, null, false, 10)
                    .setRowCount(rowCount);

    Object lastKey = null;

    int i = 0;
    while (true) {
        rangeSlicesQuery.setKeys(lastKey, null);

        QueryResult<OrderedRows<Object, Object, Object>> result = 
            rangeSlicesQuery.execute();
        OrderedRows<Object, Object, Object> rows = result.get();
        Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator();

        if (lastKey != null && rowsIterator != null) {
            rowsIterator.next();
        }

        while (rowsIterator.hasNext()) {
            Row<Object, Object, Object> row = rowsIterator.next();
            lastKey = row.getKey();
            i++;

            if (row.getColumnSlice().getColumns().isEmpty()) {
                continue;
            }
        }

        if (rows.getCount() < rowCount) {
            break;
        }

    }

    return i;
}

Спасибо.


Обновлять:

Причина заключалась в том, что объема данных было недостаточно для запуска сжатия, поэтому мне нужно было помещать больше данных и чаще сбрасывать таблицы на диск. Итак, я закончил со следующим тестовым случаем:

@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
    final int expectedAmount = 50000;

    logger.info("before persisting rows count is {}", countRows());

    for (int i = 0; i < expectedAmount; i++) {
        String rowKey = RandomStringUtils.randomAlphanumeric(128);
        Object obj = RandomStringUtils.randomAlphanumeric(1000);
        cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20);

        if (i % 100 == 0) {
            StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY);
        }
    }

    logger.info("causing major compaction...");
    StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY);
    logger.info("after major compaction rows count is {}", countRows());

    waitAtMost(Duration.TWO_MINUTES)
        .pollDelay(Duration.TWO_SECONDS)
        .pollInterval(Duration.ONE_HUNDRED_MILLISECONDS)
        .until(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                int countRows = countRows();
                logger.info("the rows count is {}", countRows);
                return countRows < expectedAmount;
            }
        });
}

полный код: тестовый класс и sut


person Alexey Grigorev    schedule 01.02.2013    source источник


Ответы (1)


Поскольку вы работаете с Java, вы можете легко выполнить сжатие через JMX, используя метод forceTableCompaction(keyspace, columnFamily) компонента org.apache.cassandra.db.StorageService MBean.

person Tyler Hobbs    schedule 02.02.2013
comment
Я пытался подключиться через jconsole и вызвать уплотнение, но строка все еще там. То, что я вижу в журналах, - это уплотнение.CompactionManager - в COLUMN_FAMILY нечего сжимать; используйте forceUserDefinedCompaction, если вы хотите принудительно сжать отдельные sstables (например, для коллекции надгробий) - person Alexey Grigorev; 04.02.2013
comment
Ах, тогда вам просто нужно сначала очистить семейство столбцов. Для этого в том же MBean есть метод JMX: forceTableFlush(keyspace, columnFamily). - person Tyler Hobbs; 04.02.2013
comment
Когда я пытаюсь сначала сбросить семейство столбцов, я получаю то же сообщение. - person Alexey Grigorev; 05.02.2013
comment
Причина в том, что я не ввел достаточно данных. Когда я увеличил число (плюс сброс данных, как вы предложили), я действительно начал видеть, что строки удаляются. Спасибо, Тайлер. Я обновил свой вопрос, включив в него окончательный тестовый пример. - person Alexey Grigorev; 05.02.2013