Потоковая передача данных с использованием Python SDK: преобразование сообщений PubSub в вывод BigQuery

Я пытаюсь использовать поток данных для чтения сообщения pubsub и записи его в большой запрос. Команда Google предоставила мне альфа-доступ, и предоставленные примеры работают, но теперь мне нужно применить их к моему сценарию.

Полезная нагрузка Pubsub:

Message {
    data: {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
    attributes: {}
}

Схема большого запроса:

schema='mac:STRING, status:INTEGER, datetime:TIMESTAMP',

Моя цель - просто прочитать полезную нагрузку сообщения и вставить в bigquery. Я изо всех сил пытаюсь разобраться в преобразованиях и том, как мне сопоставить ключи / значения с большой схемой запроса.

Я новичок в этом, поэтому любая помощь приветствуется.

Текущий код: https://codeshare.io/ayqX8w

Спасибо!


person glux    schedule 20.10.2017    source источник


Ответы (3)


Мне удалось успешно проанализировать строку pubsub, определив функцию, которая загружает ее в объект json (см. Parse_pubsub ()). Одна странная проблема, с которой я столкнулся, заключалась в том, что мне не удавалось импортировать json в глобальном масштабе. Я получал ошибку «NameError: глобальное имя json не определено». Мне пришлось импортировать json внутри функции.

Смотрите мой рабочий код ниже:

from __future__ import absolute_import

import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window

'''Normalize pubsub string to json object'''
# Lines look like this:
  # {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['mac']), (record['status']), (record['datetime'])

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input_topic', required=True,
      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
  parser.add_argument(
      '--output_table', required=True,
      help=
      ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
       'or DATASET.TABLE.'))
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:
    # Read the pubsub topic into a PCollection.
    lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | beam.Map(parse_pubsub)
                | beam.Map(lambda (mac_bq, status_bq, datetime_bq): {'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq})
                | beam.io.WriteToBigQuery(
                    known_args.output_table,
                    schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
            )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
person glux    schedule 24.10.2017
comment
Если вы установите для save_main_session значение true, вы сможете импортировать модули глобально - пример: github.com/GoogleCloudPlatform/python-docs-samples/blob/master/ - person Michael Weber; 02.12.2019

Данные, записываемые в приемник BigQuery Python SDK, должны быть в форме словаря, где каждый ключ словаря дает поле таблицы BigQuery, а соответствующее значение дает значение, которое должно быть записано в это поле. Для типа ЗАПИСЬ BigQuery само значение должно быть словарем с соответствующими парами ключ-значение.

Я подал JIRA для улучшения документации соответствующего модуля python в Beam: https://issues.apache.org/jira/browse/BEAM-3090

person chamikara    schedule 23.10.2017
comment
Спасибо за ответ. После еще нескольких экспериментов выясняется, что входящее сообщение pub / sub поступает в виде строки (очевидно). Мне нужно применить преобразование, которое преобразует объект строк в словарь. В потоке данных я обнаружил следующее сообщение об ошибке: Нарушение подсказки типа ввода в группе: ожидается Tuple [TypeVariable [K], TypeVariable [V]], got ‹type 'unicode'› - person glux; 23.10.2017