put_records () принимает аргументы ключевого слова только в Kinesis boto3 Python API

from __future__ import print_function # Python 2/3 compatibility
import boto3
import json
import decimal

#kinesis = boto3.resource('kinesis', region_name='eu-west-1')
client = boto3.client('kinesis')
with open("questions.json") as json_file:
    questions = json.load(json_file)
    Records = []
    count = 0
    for question in questions:
        value1 = question['value']
        if value1 is None:
            value1 = '0'
        record = { 'StreamName':'LoadtestKinesis', 'Data':b'question','PartitionKey':'value1' }
        Records.append(record)
        count +=1
        if count == 500:
            response = client.put_records(Records)
            Records = []

Это мой сценарий Python для загрузки массива файлов json в поток kinesis, где я объединяю 500 записей для использования функции put_records. Но я получаю сообщение об ошибке: put_records() only accepts keyword arguments. Как передать этому методу список записей? Каждая запись представляет собой json с ключом раздела.

Пример Json:

[{
        "air_date": "2004-12-31",
        "answer": "FDDDe",
        "category": "AACC",
        "question": "'No. 2: 1912 Olympian; football star at Carlisle Indian School; 6 MLB seasons with the Reds, Giants & Braves'",
        "round": "DDSSS!",
        "show_number": "233",
        "value": "$200"
    }]

person Anshuman Ranjan    schedule 27.05.2016    source источник


Ответы (2)


    from __future__ import print_function # Python 2/3 compatibility
    import boto3
    import json
    import decimal
    import time


    def putdatatokinesis(RecordKinesis):
        start = time.clock()
        response = client.put_records(Records=RecordKinesis, StreamName='LoadtestKinesis')
        print ("Time taken to process" +  len(Records) + " is " +time.clock() - start)
        return response
client = boto3.client('kinesis')
firehoseclient = boto3.client('firehose')
with open("questions.json") as json_file:
    questions = json.load(json_file)
    Records = []
    RecordKinesis = []
    count = 0
    for question in questions:
        value1 = question['value']
        if value1 is None:
            value1 = '0'
        recordkinesis = { 'Data':b'question','PartitionKey':value1 }
        RecordKinesis.append(recordkinesis)
        Records.append(record)
        count +=1
        if count == 500:
            putdatatokinesis(RecordKinesis)
            Records = []
            RecordKinesis = []

Это сработало. Идея состоит в том, чтобы передать записи аргумента в качестве аргумента с ключом.

person Anshuman Ranjan    schedule 27.05.2016

При передаче нескольких записей вам необходимо инкапсулировать записи в список записей, а затем добавить идентификатор потока.

Формат такой:

{
   "Records": [ 
      { 
         "Data": blob,
         "ExplicitHashKey": "string",
         "PartitionKey": "string"
      },
      {
         "Data": "another record",
         "ExplicitHashKey": "string",
         "PartitionKey": "string"
      }
   ],
   "StreamName": "string"
}

Дополнительную информацию см. В документации Kinesis.

person vageli    schedule 27.05.2016
comment
Что делать, если у меня миллионы записей, и я не могу вручную записывать каждую из них в записи? Я искал зацикливаться и добавлять каждую запись в список. У Kinesis лучшая производительность - 500 записей на пакет, поэтому мне нужен способ добавить 500 записей за один раз. - person Anshuman Ranjan; 27.05.2016
comment
@AnshumanRanjany, вы все еще можете выполнять пакетную обработку записей. Вам просто нужно немного изменить свой код. - person vageli; 27.05.2016