Эластичная опускается

У меня есть стек ELK с Redis. Схема: logstash -> redis -> logstash (индексатор) -> elasticsearch -> kibana

Индексатор Logstash получает данные из Redis и помещает их в эластичный:

    input { 
                redis { 
                        host=>"redis"
                        type=>"redis-input"
                        data_type=>"list"
                        key=>"logstash"
                        } 
                }
        filter { 
                geoip { 
                        source=>"ipaddr"
                        target=>"geoip"
                        database=>"/GeoLiteCity.dat"
                        add_field=>["[geoip][coordinates]","%{[geoip][longitude]}"]
                        add_field=>["[geoip][coordinates]","%{[geoip][latitude]}"]
                        } 
                mutate {
                        remove_field=>["message","@version","timestamp"]
                        convert=>{"[geoip][coordinates]"=>"float"}
                        }
                }
        output { 
                    elasticsearch { 
                                    template=>"/typing-template.json"
                                    template_overwrite=>true
                                    hosts=>["elasticsearch:9200"] 
                                    }
                }

Есть 4 сервера, логи которых я хочу собрать. Вот их конфиг logstash:

input {
    file {
        path => [ "C:/Program Files (x86)/*/logs/*.log", "C:/Program Files (x86)/**/logs/*.log", "C:/Program Files/***/logs/*.log", "C:/Program Files/****/logs/*.log" ]
        start_position => "beginning"
        type => "mtdclog"
        ignore_older => 0
        sincedb_path => "NUL"
            }
}   

filter {
        grok {      match => { "path" => "%{GREEDYDATA}/(?<logdate>[0-9]{8})\.log" }}
        grok {      match => [  "message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{IPV4:ipaddr}\t'%{NUMBER:account}': (?<event>login) \[ver: (?<client_build>[0-9\.]+)",
                                "message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{IPV4:ipaddr}\t'%{NUMBER:account}': (?<event>liveupdate) '%{GREEDYDATA:data}'",
                                "message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{IPV4:ipaddr}\t'%{NUMBER:account}': (?<event>check version)%{GREEDYDATA:data}",
                                "message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{IPV4:ipaddr}\t'%{NUMBER:account}': %{GREEDYDATA:data}",
                                "message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}(?<event>News):%{GREEDYDATA:data}",
                                "message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{IPV4:ipaddr}\t(?<event>unknown command) (?<command_code>[A-Z0-9]+)",
                                "message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}(?<event>History):%{GREEDYDATA:data}",
                                "message", "%{NONNEGINT:log_stream}\t%{TIME:logtime}\s{1,2}%{GREEDYDATA:log_line}",
                                "message", "%{GREEDYDATA:log_line}"
                                ]   
                    }

        mutate {
            add_field => { "ts" => "%{logdate} %{logtime}"} 
            remove_field => [ "logdate", "logtime" ]
                }

        date {
                match => [ "ts", "YYYYMMdd HH:mm:ss.SSS" ]
                target => "@timestamp"
                }   


        if [path] =~ "Pattern1" { mutate {  add_field => { "dc_type" => "Pattern1" }    }}
        if [path] =~ "Pattern2" { mutate {  add_field => { "dc_type" => "Pattern2" }    }}

        mutate {    remove_field => [ "message", "@version", "ts", "path", "host"  ]    
                    add_field => { "location" => "somecity" }
                    convert => {    "log_stream" => "integer"
                                    "client_build"   => "integer"
                                    "account" => "integer"
                                    }
                    }                               

}

output {

            redis  {
            host => "xxx.yyy.zzz.aaa"
            port => "6381"
            data_type => "list"
            key => "logstash" }

Задача: я хочу обработать старые журналы за 1 месяц. Это около 35 МБ файлов ежедневного журнала. Таким образом, общий объем с 4 серверов составляет около 140 МБ, не так много.

Проблема: Потом запускаю службы logstash - все хорошо, 4-5 часов работает нормально. Я вижу проанализированные данные в кибане и могу с ними работать. Но потом резинка идет вниз. Сообщение: «Тайм-аут запроса через 30000 мс».

Тот же стек ELK, который я использую для других серверов и конфигураций logstash - он отлично работает и обрабатывает больше строк журнала. Но я не могу понять, в чем проблема в этом случае.


person Artyom Degtyarev    schedule 21.06.2016    source источник


Ответы (1)


также я реализовал сопоставления для индексатора logstash:

{
  "template" : "logstash-*",
  "settings" : {
    "index.refresh_interval" : "5s"
  },
  "mappings" : {
    "_default_" : {
      "_all" : {"enabled" : true, "omit_norms" : true},
      "dynamic_templates" : [ {
        "message_field" : {
          "match" : "message",
          "match_mapping_type" : "string",
          "mapping" : {
            "type" : "string", "index" : "analyzed", "omit_norms" : true,
            "fielddata" : { "format" : "disabled" }
          }
        }
      }, {
        "string_fields" : {
          "match" : "*",
          "match_mapping_type" : "string",
          "mapping" : {
            "type" : "string", "index" : "analyzed", "omit_norms" : true,
            "fielddata" : { "format" : "disabled" },
            "fields" : {
              "raw" : {"type": "string", "index" : "not_analyzed", "doc_values" : true, "ignore_above" : 256}
            }
          }
        }
      }, {
        "float_fields" : {
          "match" : "*",
          "match_mapping_type" : "float",
          "mapping" : { "type" : "float", "doc_values" : true }
        }
      }, {
        "double_fields" : {
          "match" : "*",
          "match_mapping_type" : "double",
          "mapping" : { "type" : "double", "doc_values" : true }
        }
      }, {
        "byte_fields" : {
          "match" : "*",
          "match_mapping_type" : "byte",
          "mapping" : { "type" : "byte", "doc_values" : true }
        }
      }, {
        "short_fields" : {
          "match" : "*",
          "match_mapping_type" : "short",
          "mapping" : { "type" : "short", "doc_values" : true }
        }
      }, {
        "integer_fields" : {
          "match" : "*",
          "match_mapping_type" : "integer",
          "mapping" : { "type" : "integer", "doc_values" : true }
        }
      }, {
        "long_fields" : {
          "match" : "*",
          "match_mapping_type" : "long",
          "mapping" : { "type" : "long", "doc_values" : true }
        }
      }, {
        "date_fields" : {
          "match" : "*",
          "match_mapping_type" : "date",
          "mapping" : { "type" : "date", "doc_values" : true }
        }
      }, {
        "geo_point_fields" : {
          "match" : "*",
          "match_mapping_type" : "geo_point",
          "mapping" : { "type" : "geo_point", "doc_values" : true }
        }
      } ],
      "properties" : {
        "@timestamp": { "type": "date", "doc_values" : true },
        "@version": { "type": "string", "index": "not_analyzed", "doc_values" : true },
        "ipaddr": { "type": "ip", "doc_values" : true },
        "ping": {"type": "float", "doc_values" : true },
        "geoip"  : {
          "type" : "object",
          "dynamic": true,
          "properties" : {
            "ip": { "type": "ip", "doc_values" : true },
            "location" : { "type" : "geo_point", "doc_values" : true },
            "latitude" : { "type" : "float", "doc_values" : true },
            "longitude" : { "type" : "float", "doc_values" : true }
          }
        }
      }
    }
  }
}
person Artyom Degtyarev    schedule 21.06.2016