Pyspark сохраняет RDD для Кассандры

У меня есть RDD (test_rdd), как показано ниже

[
{'user_lname': u'TEst1', 'user_id': u'2aa8ae30-c0e5-48bb-ab16-a2ed2e78c8c3', 'user_phone': u'1234567890', 'user_fname': u'TestingTesting2', 'amount': 1222,’event_timestamp': u’2016-09-29T07:49:50.866+00:00’}, 

{'user_lname': u'TEst2', 'user_id': u'2aa8ae30-c0e5-48bb-ac16-a2ed2e78c8c3', 'user_phone': u'1234567891', 'user_fname': u'TestingTesting', 'amount': 12,’event_timestamp': u’2016-10-27T07:49:50.866+00:00’},

{'user_lname': u'TEst3', 'user_id': u'2aa8ae30-c1e5-48bb-ab16-a2ed2e78c8c3', 'user_phone': u'1234567892', 'user_fname': u'TestingTesting3', 'amount': 122,’event_timestamp': u’2016-09-27T07:49:50.866+00:00'}
]

Я хочу сохранить указанный выше RDD в таблице cassandra.
Я получаю следующую ошибку, когда использую

test_rdd.saveToCassandra("keyspace1","table1")  

Traceback (последний вызов последним): файл «/var/spark/test/k.py», строка 179, в
parsed_data.saveToCassandra («keyspace1», «table1») AttributeError: объект 'PipelinedRDD' не имеет атрибута 'saveToCassandra'


person Santhavathi Sivakumaran    schedule 25.11.2016    source источник


Ответы (1)


Или

  • следуйте инструкциям для pyspark-cassandra
  • и import pyspark_cassandra

or

  • следуйте инструкциям для официального _3 _
  • преобразовать в DataFrame (toDF)
  • написать Dataframe

    df.write.format("org.apache.spark.sql.cassandra").options(
      table=table, keyspace=keyspace
    ).save()
    
person Community    schedule 25.11.2016
comment
Спасибо. Я использовал 2-й способ. В чем разница между pyspark-cassandra и spark-cassandra-connector. - person Santhavathi Sivakumaran; 29.11.2016