Обработка повторяющихся записей в Elasticsearch

Я использую стек Hadoop + ELK для создания аналитического стека. Я пытаюсь ежедневно обновлять индекс.

Я использую сторонние данные в формате CSV. Я не контролирую входные данные, т. Е. Не могу попросить изменить схему для CSV-файла.

Проблема в том, что в записях CSV нет уникального идентификатора, или даже объединение столбцов для создания уникального идентификатора также не сработает, поэтому при обновлении Elasticsearch в индекс добавляются повторяющиеся данные.

Итак, если данные первого дня похожи на

Product1,Language1,Date1,$1
Product2,Language2,Date1,$12

Данные дня 2 становятся

Product1,Language1,Date1,$1
Product2,Language2,Date1,$12
Product1,Language1,Date1,$1
Product2,Language2,Date1,$12
Product3,Language1,Date2,$5(new record added on day2)

Есть ли хороший способ справиться с этим в ELK. Я использую Logstash для использования файлов csv.


person Crypt    schedule 15.07.2015    source источник


Ответы (2)


Думаю, все дело в документе "_id".

Если бы у вас был уникальный «_id» для каждого документа, не было бы проблем, так как вы просто «обновите» документ до того же значения. Вы даже можете настроить сопоставление, чтобы не разрешать обновление, если необходимо.

Ваша проблема в том, что вы не связываете "_id" документа с содержимым документа (что в некоторых случаях нормально).

Я предполагаю, что простым решением было бы создать собственное поле «my_id» и установить для него путь «_id», например здесь.

Тогда проблема заключается в том, как создать это поле «my_id». Я бы использовал хеш для документа.

Примером фрагмента кода Python для этого может быть (я уверен, что вы могли бы найти подходящий плагин Ruby):

import hashlib
hash_object = hashlib.sha1(b"Product2,Language2,Date1,$12")
hex_dig = hash_object.hexdigest()
print(hex_dig)
person eran    schedule 15.07.2015
comment
Спасибо @eran за ответ. Но, как я уже упоминал, комбинация Product2, Language2, Date1, $ 12 также не уникальна. Поэтому для меня невозможно создать поле id из этих полей, которое было бы уникальным. - person Crypt; 16.07.2015
comment
Хммм ... У вас есть доступ к номеру строки? В противном случае это не проблема ElasticSearch. У вас будет такая же проблема с любой базой данных, - person eran; 16.07.2015
comment
Я знаю, что это не проблема Elasticsearch. Я вижу только один выход. Повторное создание индекса каждый раз, что немного дорого. Искал что-то, что кто-то мог сделать раньше. - person Crypt; 16.07.2015
comment
Как насчет добавления номера строки к записи перед ее записью и использования этого идентификатора документа этапа номера? - person eran; 16.07.2015
comment
Я тоже считал это, но вставка может произойти в любой позиции в файле csv, так что это нарушит всю позицию. Скажем, я нумерую ее 123, теперь новая запись вставлена ​​между 2 и 3, так что весь порядок будет неправильным. - person Crypt; 16.07.2015
comment
Я бы порекомендовал вам сравнить два файла cvs, а затем учесть разницу; поэтому в первый день вы добавите два документа в ES (Product1 ad Product2), а затем во второй вы добавите только один документ в ES (Product3); Вы можете найти способ сравнить файл cvs, погуглив. - person Viswesn; 17.07.2015

Я считаю, что первая часть решения будет заключаться в том, чтобы определить набор значений, которые при совместном использовании будут уникальными для документа. В противном случае невозможно отделить дубликаты документов от подлинных. Для обсуждения предположим, что четыре значения (Product1, Language1, Date1, $ 1) определяют документ. Если существует другой документ с такими же заданными значениями, то это копия предыдущего документа, а не новый документ.

Предположим, у вас есть (Product1, Language1, Date1, $ 1), вы можете сначала выполнить запрос, который выполняет поиск, существует ли этот документ уже в ElasticSearch или нет. Что-то типа:

{
"filter": {
    "bool": {
        "must": [
            {
                "term": {
                    "pdtField": "Product1"
                }
            },
            {
                "term": {
                    "langField": "Language1"
                }
            },
            {
                "term": {
                    "dateField": "Date1"
                }
            },
            {
                "term": {
                    "costField": "$1"
                }
            }
        ]
    }
}
}

Позаботьтесь об именах используемых здесь полей в соответствии с тем, что вы на самом деле используете. Если этот результат фильтра имеет doc_count != 0, вам не нужно создавать для этого новый документ. В противном случае создайте новый документ с имеющимися значениями.

В качестве альтернативы вы можете создать идентификатор документа, используя хэш, созданный из (Product1, Language1, Date1, $ 1), а затем использовать этот хеш в качестве _id документа. Сначала проверьте, существует ли какой-либо документ с этим _id. Если он не существует, создайте новый документ со значениями в руке против значения _id, сгенерированного хешем.

В случае, если у вас нет контроля над тем, как создается отдельный документ, тогда, возможно, вы можете попробовать предварительно обработать свой ввод CSV, используя стратегию, рекомендованную выше, оставить только необходимые записи в CSV и избавиться от остальных, а затем продолжить как обычно с полученным CSV.

person Ashish Goel    schedule 24.12.2015