EKS регистрируется в потоке CloudWatch в виде строки

У меня возникла эта проблема, у меня есть кластер EKS, который отправляет журналы в Cloudwatch, а затем Firehose передает журналы в корзину s3.

Моя цель — получить эти журналы с s3 и отправить их в elasticsearch в больших количествах. Я написал лямбда-функцию Python, и она отлично работает, когда журналы представляют собой jsons. Моя проблема в том, что некоторые журналы представляют собой строки или «своего рода» JSON.

Пример:

куб-аутентификатор:

time="2019-09-13T09:30:50Z" level=error msg="Watch channel closed."

kube-apiserver :

E0912 10:19:10.649757 1 watcher.go:208] watch chan error: etcdserver: mvcc: required revision has been compacted

Мне интересно, должен ли я попытаться обернуть эти сообщения и преобразовать их в JSON или есть какой-либо способ изменить формат журнала на JSON. Я думал о написании регулярного выражения, но у меня недостаточно знаний о регулярном выражении.


person Amit Baranes    schedule 13.09.2019    source источник
comment
почему s3 между ними? лучше отправить как строку в лямбда-функцию, а затем в ELK,   -  person Adiii    schedule 13.09.2019
comment
Из-за требований клиента я знаю, что этот способ лучше.   -  person Amit Baranes    schedule 13.09.2019
comment
хорошо, так что вы можете прочитать документ из s3 и преобразовать его в строку, прежде чем использовать ELk что-то вроде data.Body.toString('utf-8');   -  person Adiii    schedule 13.09.2019
comment
Журналы в s3 в основном представляют собой jsons, есть небольшое количество строк, которые вызывают у меня эту проблему. Мне нужно преобразовать их в JSON или - попытаться изменить выходной журнал на формант JSON, если это возможно   -  person Amit Baranes    schedule 13.09.2019
comment
Так что, если вы отфильтруете эти конкретные строки или шаблоны журналов при чтении и различении их, а также того, откуда журналы поступают в облачное хранилище?   -  person Adiii    schedule 13.09.2019
comment
Закончилось написанием функции Python, которая обрабатывает эти журналы и форматирует их как JSON.   -  person Amit Baranes    schedule 16.09.2019
comment
О, круто. вы можете опубликовать ответ.   -  person Adiii    schedule 16.09.2019
comment
@Adiii Опубликуйте мой ответ. Спасибо за ваши комментарии.   -  person Amit Baranes    schedule 17.09.2019


Ответы (1)


Как упоминалось в комментариях, в итоге было написано 2 функции, которые обрабатывают журналы и преобразуют их в JSON.

Первый обрабатывает kube-apiserver,kube-controller-manager and kube-scheduler группы журналов:

def convert_text_logs_to_json_and_add_logGroup(message,logGroup):
    month_and_day = message.split(' ')[0][1:]
    month_and_day = insert_dash(month_and_day,2)
    log_time_regex = r"\s+((?:\d{2})?:\d{1,2}:\d{1,2}.\d{1,})"
    log_time = re.findall(log_time_regex, message)[0]
    currentYear = datetime.now().year
    full_log_datetime = "%s-%sT%sZ" %(currentYear,month_and_day,log_time)
    log_contnet = (re.split(log_time_regex,message)[2])
    message = '{"timestamp": "%s", "message":"%s","logGroup" :"%s"}' %(full_log_datetime,log_contnet.replace('"',''),logGroup)
    return message

вторая функция обрабатывает authenticator группу журналов:

def chunkwise(array, size=2):
    it = iter(array)
    return izip(*[it]*size)

def wrap_text_to_json_and_add_logGroup(message,logGroup):
    regex = r"\".*?\"|\w+"
    matches = re.findall(regex, message)
    key_value_pairs = chunkwise(matches)
    json_message= {}
    for key_value in key_value_pairs:
        key = key_value[0]
        if key == 'time':
            key = 'timestamp'
        value = key_value[1].replace('"','')
        json_message[key] = value
    json_message['logGroup'] = logGroup
    log_to_insert = json.dumps(json_message)
    return log_to_insert

Я надеюсь, что эти функции будут полезны для тех, кому может понадобиться вставлять логи из cloudwatch в elasticsearch.

person Amit Baranes    schedule 17.09.2019