ElasticSearch индексирует новые записи, созданные пользовательским интерфейсом, но записи, созданные файлом liquibase, не индексируются, поэтому они не отображаются в результатах поиска. ElasticSearch должен индексировать все записи, созданные файлами пользовательского интерфейса и liquibase. файлы ликвидаз.
Индекс эластичного поиска
Ответы (2)
Liquibase вносит изменения только в вашу базу данных. Если у вас нет процесса, который прослушивает изменения в базе данных, а затем обновляет Elasticsearch, вы не увидите изменений.
Может быть несколько способов получить записи вашей базы данных в Elasticsearch:
- Ваш пользовательский интерфейс, вероятно, уже вызывает некоторый внутренний код для индексации создания или обновления в Elasticsearch.
- Используйте пакетный процесс, который знает, какие записи были изменены (например, используйте обновленный столбец флагов или столбец updated_timestamp), а затем индексируйте их в Elasticsearch.
Второй вариант можно выполнить в коде с помощью сценария или запланированного внутреннего задания, или вы можете использовать Logstash с подключаемым модулем jdbc-input.
Как Sarwar Bhuiyan и Могсдад грустно
Если у вас нет процесса, который прослушивает изменения в базе данных, а затем обновляет Elasticsearch.
Вы можете использовать liquibase для заполнения elasticsearch (эта задача будет выполнена один раз, как и обычная миграция). Для этого вам необходимо создать customChange:
<customChange class="org.test.ElasticMigrationByEntityName">
<param name="entityName" value="org.test.TestEntity" />
</customChange>
В этой миграции на основе Java вы можете вызывать нужные вам службы. Вот пример того, что вы можете сделать (пожалуйста, не используйте код из этого примера в рабочей версии).
public class ElasticMigrationByEntityName implements CustomTaskChange {
private String entityName;
public String getEntityName() {
return entityName;
}
public void setEntityName(String entityName) {
this.entityName = entityName;
}
@Override
public void execute(Database database) {
//We schedule the task for the next execution. We are waiting for the context to start and we get access to the beans
DelayedTaskExecutor.add(new DelayedTask(entityName));
}
@Override
public String getConfirmationMessage() {
return "OK";
}
@Override
public void setUp() throws SetupException {
}
@Override
public void setFileOpener(ResourceAccessor resourceAccessor) {
}
@Override
public ValidationErrors validate(Database database) {
return new ValidationErrors();
}
/* ===================== */
public static class DelayedTask implements Consumer<ApplicationContext> {
private final String entityName;
public DelayedTask(String entityName) {
this.entityName = entityName;
}
@Override
public void accept(ApplicationContext applicationContext) {
try {
checkedAccept(applicationContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
//We're going to find beans by name (the most controversial point)
private void checkedAccept(ApplicationContext context) throws ClassNotFoundException {
Class entityClass = Class.forName(entityName);
String name = entityClass.getSimpleName();
//Please do not use this code in production
String repositoryName = org.apache.commons.lang3.StringUtils.uncapitalize(name + "Repository");
String repositorySearchName = org.apache.commons.lang3.StringUtils.uncapitalize(name + "SearchRepository");
JpaRepository repository = (JpaRepository) context.getBean(repositoryName);
ElasticsearchRepository searchRepository = (ElasticsearchRepository) context.getBean(repositorySearchName);
//Doing our work
updateData(repository, searchRepository);
}
//Write your logic here
private void updateData(JpaRepository repository, ElasticsearchRepository searchRepository) {
searchRepository.saveAll(repository.findAll());
}
}
}
Поскольку бины еще не созданы, нам придется их дождаться
@Component
public class DelayedTaskExecutor {
@Autowired
private ApplicationContext context;
@EventListener
//We are waiting for the app to launch
public void onAppReady(ApplicationReadyEvent event) {
Queue<Consumer<ApplicationContext>> localQueue = getQueue();
if(localQueue.size() > 0) {
for (Consumer<ApplicationContext> consumer = localQueue.poll(); consumer != null; consumer = localQueue.poll()) {
consumer.accept(context);
}
}
}
public static void add(Consumer<ApplicationContext> consumer) {
getQueue().add(consumer);
}
public static Queue<Consumer<ApplicationContext>> getQueue() {
return Holder.QUEUE;
}
private static class Holder {
private static final Queue<Consumer<ApplicationContext>> QUEUE = new ConcurrentLinkedQueue();
}
}
Пример сущности:
@Entity
@Table(name = "test_entity")
@Document(indexName = "testentity")
public class TestEntity implements Serializable {
private static final long serialVersionUID = 1L;
@Id
@Field(type = FieldType.Keyword)
@GeneratedValue(generator = "uuid")
@GenericGenerator(name = "uuid", strategy = "uuid2")
private String id;
@NotNull
@Column(name = "code", nullable = false, unique = true)
private String code;
...
}