Мы хотим проверить, что если столбец имеет свойство TTL (time-to-live), он в конечном итоге будет полностью удален из cassandra вместе с пустой строкой, которая его содержала.
Как я понял, алгоритм проверки этого поведения таков
- при сохранении объекта установить TTL для столбца
- подождите, когда истечет время TTL, убедитесь, что возвращаемое значение равно нулю
- подождите, пока пройдет период GC_GRACE_SECONDS
- проверьте, что строка также удаляется
И я не смог проверить последний пункт.
Как я обнаружил (например, здесь или здесь и в других местах), мне нужно выполнить уплотнение. Были подняты подобные вопросы (например, Гектор (Кассандра) Удалить аномалию), но я не не нашел ничего, что помогло бы, да и гугление не сильно помогло.
Итак, вопрос в том, как я могу принудительно сжать мой интеграционный тест (используя hector), чтобы убедиться, что он ведет себя так, как ожидалось? Или есть другие способы сделать это?
P.S. Усечение семейства столбцов недопустимо.
Вот подробности.
Мои тесты:
private static final String KEYSPACE = "KEYSPACE";
private static final String COLUMN_FAMILY = "COLUMN_FAMILY";
private static final int GC_CRACE_SECONDS = 5;
// sut
private CassandraService cassandraService;
// dependencies
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr",
"localhost:9160");
private Keyspace keyspace;
@BeforeClass
public static void setupBeforeClass() {
EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon();
}
@Before
public void setUp() throws Exception {
keyspace = createKeyspace(KEYSPACE, cluster,
new QuorumAllConsistencyLevelPolicy());
cassandraService = new CassandraService(cluster, KEYSPACE,
COLUMN_FAMILY, GC_CRACE_SECONDS);
}
@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
Object obj = "OBJECT";
String rowKey = "key";
String columnName = "columnName";
logger.info("before persisting rows count is {}" + countRows());
cassandraService.persistObjectWithTtl(rowKey, columnName, obj, 5);
logger.info("after persisting rows count is {}" + countRows());
Object value = retrieve(rowKey, columnName);
assertNotNull(value);
logger.info("before TTL passes rows count is {}" + countRows());
TimeUnit.SECONDS.sleep(6);
Object nullValue = retrieve(rowKey, columnName);
assertNull(nullValue);
logger.info("after TTL passes rows count is {}" + countRows());
TimeUnit.SECONDS.sleep(10);
logger.info("wait 10 more seconds... rows count is {}" + countRows());
System.out.println("================================" + countRows());
TimeUnit.SECONDS.sleep(120);
int countRows = countRows();
logger.info("wait 2 more minutes... rows count is {}" + countRows);
assertEquals(0, countRows);
}
Код для сохранения:
public void persistObjectWithTtl(Object rowKey, Object columnName,
Object obj, int ttl) {
LOGGER.debug("Persist {} / {}", rowKey, columnName);
HColumn<Object, Object> column = createColumn(columnName, obj,
SERIALIZER, SERIALIZER);
column.setTtl(ttl);
executeInsertion(rowKey, column);
}
private void executeInsertion(Object rowKey, HColumn<Object, Object> column) {
Mutator<Object> mutator = createMutator(keyspace, SERIALIZER);
mutator.addInsertion(rowKey, this.columnFamilyName, column);
mutator.execute();
}
Настройка GcGraceSeconds для семейства столбцов:
private void addColumnFamily(String keySpaceName, String columnFamilyName,
int gcGraceSeconds) {
ColumnFamilyDefinition columnFamilyDefinition =
createColumnFamilyDefinition(keySpaceName, columnFamilyName);
ThriftCfDef columnFamilyWithGCGraceSeconds =
new ThriftCfDef(columnFamilyDefinition);
columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds);
cluster.addColumnFamily(columnFamilyWithGCGraceSeconds);
}
И код для подсчета строк, найденный на SO:
public int countRows() {
int rowCount = 100;
ObjectSerializer serializer = ObjectSerializer.get();
RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery =
HFactory.createRangeSlicesQuery(keyspace, serializer,
serializer, serializer)
.setColumnFamily(COLUMN_FAMILY)
.setRange(null, null, false, 10)
.setRowCount(rowCount);
Object lastKey = null;
int i = 0;
while (true) {
rangeSlicesQuery.setKeys(lastKey, null);
QueryResult<OrderedRows<Object, Object, Object>> result =
rangeSlicesQuery.execute();
OrderedRows<Object, Object, Object> rows = result.get();
Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator();
if (lastKey != null && rowsIterator != null) {
rowsIterator.next();
}
while (rowsIterator.hasNext()) {
Row<Object, Object, Object> row = rowsIterator.next();
lastKey = row.getKey();
i++;
if (row.getColumnSlice().getColumns().isEmpty()) {
continue;
}
}
if (rows.getCount() < rowCount) {
break;
}
}
return i;
}
Спасибо.
Обновлять:
Причина заключалась в том, что объема данных было недостаточно для запуска сжатия, поэтому мне нужно было помещать больше данных и чаще сбрасывать таблицы на диск. Итак, я закончил со следующим тестовым случаем:
@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
final int expectedAmount = 50000;
logger.info("before persisting rows count is {}", countRows());
for (int i = 0; i < expectedAmount; i++) {
String rowKey = RandomStringUtils.randomAlphanumeric(128);
Object obj = RandomStringUtils.randomAlphanumeric(1000);
cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20);
if (i % 100 == 0) {
StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY);
}
}
logger.info("causing major compaction...");
StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY);
logger.info("after major compaction rows count is {}", countRows());
waitAtMost(Duration.TWO_MINUTES)
.pollDelay(Duration.TWO_SECONDS)
.pollInterval(Duration.ONE_HUNDRED_MILLISECONDS)
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
int countRows = countRows();
logger.info("the rows count is {}", countRows);
return countRows < expectedAmount;
}
});
}
полный код: тестовый класс и sut