elasticsearch объединяет некоторые значения в одном поле

У меня есть некоторые необработанные данные

{
  {
    "id":1,
    "message":"intercept_log,UDP,0.0.0.0,68,255.255.255.255,67"
  },
  {
    "id":2,
    "message":"intercept_log,TCP,172.22.96.4,52085,239.255.255.250,3702,1:"
  },
  {
    "id":3,
    "message":"intercept_log,UDP,1.0.0.0,68,255.255.255.255,67"
  },
  {
    "id":4,
    "message":"intercept_log,TCP,173.22.96.4,52085,239.255.255.250,3702,1:"
  }
}

Требование

Я хочу сгруппировать эти данные по значению части сообщения сообщения. Выходное значение такое

{
  {
    "GroupValue":"TCP",
    "DocCount":"2"
  },
  {
    "GroupValue":"UDP",
    "DocCount":"2"
  }
}

Пытаться

  • Я пробовал с этими кодами, но не удалось

GET systemevent*/_search
{
  "size": 0, 
  "aggs": {
    "tags": {
      "terms": {
        "field": "message.keyword",
        "include": " intercept_log[,,](.*?)[,,].*?"
      }
    }
  },
  "track_total_hits": true
}
  • Теперь я пытаюсь использовать конвейеры для удовлетворения этой потребности.
  • ggs, похоже, только группирует поля.
  • У кого-нибудь есть идея получше?

Ссылка на сайт

Агрегация терминов

Обновлять

Моя сцена немного особенная. Я собираю журналы с разных серверов, а затем импортирую журналы в es. Следовательно, существует большая разница между полями сообщений. Если вы напрямую используете операторы скрипта для группировки статистики, это приведет к сбою группы или неточной группировке. Я пытаюсь отфильтровать некоторые данные в соответствии с условиями, а затем использую скрипт для группировки кода операции (код комментария 1), но этот код не может сгруппировать правильные результаты.

Это моя сцена, чтобы добавить:

Наша команда использует es для анализа журнала сервера, использует rsyslog для пересылки данных в серверный центр, а затем использует logstash для фильтрации и извлечения данных в es. В настоящее время в ES есть поле, называемое сообщением, и значением сообщения является подробная информация журнала. В это время нам нужно подсчитать данные, содержащие некоторые значения в сообщении.

код комментария 1

POST systemevent*/_search
{
  "size": 0, 
  "query": {
    "bool": {
      "must": [
        {
          "match_phrase": {
            "message": {
              "query": "intercept_log"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "protocol": {
      "terms": {
        "script": "def values = /,/.split(doc['message.keyword'].value); return values.length > 1 ? values[1] : 'N/A'",
        "size": 10
      }
    }
  },
  "track_total_hits": true
}

код комментария 2

POST test2/_search
{
  "size": 0,
  "aggs": {
    "protocol": {
      "terms": {
        "script": "def values = /.*,.*/.matcher( doc['host.keyword'].value ); if( name.matches() ) {return values.group(1) } else { return 'N/A' }",
        "size": 10
      }
    }
  }
}

person ChaosFish    schedule 10.11.2020    source источник


Ответы (1)


Самый простой способ решить эту проблему — использовать скрипты в агрегации terms. Скрипт просто разделится на запятые и примет второе значение.

    POST systemevent*/_search
    {
      "size": 0,
      "aggs": {
        "protocol": {
          "terms": {
            "script": "def values = /,/.split(doc['message.keyword'].value); return values.length > 1 ? values[1] : 'N/A';",
            "size": 10
          }
        }
      }
    }

Использовать регулярное выражение

POST test2/_search
{
  "size": 0,
  "aggs": {
    "protocol": {
      "terms": {
        "script": "def m = /.*proto='(.*?)'./.matcher(doc['message.keyword'].value ); if( m.matches() ) { return m.group(1) } else { return 'N/A' }"
      }
    }
  }
}

Результаты будут выглядеть так

  "buckets" : [
    {
      "key" : "TCP",
      "doc_count" : 2
    },
    {
      "key" : "UDP",
      "doc_count" : 2
    }
  ]

введите здесь описание изображения

Лучшим и более эффективным способом было бы разбить поле message на новые поля с помощью конвейера загрузки или Logstash.

person Val    schedule 10.11.2020
comment
Это здорово ! Спасибо за Ваш ответ. но если в нашем поле сообщения нет обычного значения, например, aabbb, мы получим ошибку времени выполнения. Как я могу этого избежать. - person ChaosFish; 10.11.2020
comment
Поскольку вы используете Logstash, мне интересно, почему вы не анализируете поле сообщения перед индексацией документа в ES. Просто пересылать разнородные журналы без их квалификации — не очень хороший подход. То, что я предложил в своем ответе (подход, основанный на сценариях), следует использовать с осторожностью, но если у вас есть способ лучше проанализировать ваши данные до того, как они попадут в ES, вам обязательно следует это сделать. - person Val; 10.11.2020
comment
Мы не решили изменить файл конфигурации logstash, потому что у нас есть только одна возможность развертывания logstash, и у нас есть бесчисленное количество logstash для развертывания, и после развертывания мы не можем выполнить удаленное обновление. Поэтому мы можем предоставить только общий файл конфигурации для logstash. Таким образом, мы можем фильтровать только на стороне ES. Интересно, можно ли агрегировать с помощью обычного в Scipt. - person ChaosFish; 10.11.2020
comment
В Logstash вы определенно можете иметь несколько конфигураций без нескольких экземпляров. Он называется конвейеры, и каждое приложение может иметь свои собственные трубопровод. Если у вас большое разнообразие журналов, поступающих в Logstash/ES, нет смысла позволять ES выполнять всю работу путем потоковой передачи разнородных необработанных данных в ES. - person Val; 10.11.2020
comment
Позвольте мне представить, что я занимаюсь железнодорожной отраслью. Нам нужно собрать данные станционного оборудования. Эти станции разбросаны по всем уголкам нашей страны. Они могут быть в пустыне и девственном лесу. Персонал там понятия не имеет о компьютере. Они не могут предоставить данные станции. Давайте настроим эксклюзивный logstash. Разработчикам требуется много времени, чтобы вернуться к станции и обратно, что затруднит выполнение нашего плана сбора данных. - person ChaosFish; 10.11.2020
comment
Могу ли я получить нужные данные, изменив «код комментария 2» (локальный в описании) - person ChaosFish; 10.11.2020
comment
В этом случае я думаю, что каждая станция должна просто иметь Filebeat. развернут экземпляр, который отправляет все необработанные журналы в централизованный экземпляр Logstash, находящийся под контролем разработчиков. Поверьте мне, если у вас так много станций, передающих данные, последнее, что вам нужно, это чтобы все они передавали свои необработанные необработанные данные в ES. Если вы не контролируете то, что попадет в ES, вам будет очень трудно использовать свои данные. - person Val; 10.11.2020
comment
Нам нужно провести статистический анализ некоторых значений в поле сообщения. Если мы используем logstash для разбора и копирования в es, данные в нашей таблице станут запутанными и трудными для понимания. Если установлено несколько индексов, это приведет к проблемам с управлением индексами, и параллельный анализ ES не рекомендуется. - person ChaosFish; 10.11.2020
comment
Я, очевидно, недостаточно знаю о вашем случае использования. Итак, приступайте к агрегации по сценарию, и в какой-то момент в будущем, как только вы начнете видеть недостатки этого подхода, мы сможем продолжить и поговорить подробнее :-) - person Val; 10.11.2020
comment
Большое спасибо за ваш анализ и предложения. Мы можем продолжить обсуждение в ближайшее время - person ChaosFish; 10.11.2020
comment
Я нашел способ использовать регулярное выражение. Спасибо за вашу помощь! - person ChaosFish; 23.11.2020
comment
Круто, рад, что помогло! - person Val; 23.11.2020