Итерационный запрос Spark Cassandra

Я применяю следующее через Spark Cassandra Connector:

val links = sc.textFile("linksIDs.txt")
links.map( link_id => 
{ 
val link_speed_records = sc.cassandraTable[Double]("freeway","records").select("speed").where("link_id=?",link_id)
average = link_speed_records.mean().toDouble
})

Я хотел бы спросить, есть ли способ более эффективно применить указанную выше последовательность запросов, учитывая, что единственный параметр, который я всегда меняю, - это 'link_id'.

Значение 'link_id' - это единственный ключ раздела в моей таблице «записи» Cassandra. Я использую Cassandra v.2.0.13, Spark v.1.2.1 и Spark-Cassandra Connector v.1.2.1

Я подумал, можно ли открыть сеанс Cassandra, чтобы применить эти запросы и при этом получить «link_speed_records» как SparkRDD.


person raschild    schedule 12.07.2015    source источник
comment
Мне любопытно, как вы можете запускать опубликованный вами код без получения NPE из-за того, что sc недоступен для рабочих из RDD.   -  person Metropolis    schedule 14.07.2015
comment
Все запросы (запросы) отправлялись через искровый драйвер с использованием искрового контекста. Рабочие несли единоличную ответственность за вычисление полученного CassandraRDD. Следовательно, не было необходимости в том, чтобы СК был доступен рабочим.   -  person raschild    schedule 14.07.2015


Ответы (1)


Используйте метод joinWithCassandra, чтобы использовать RDD ключей для извлечения данных из таблицы Cassandra. Метод, указанный в вопросе, будет сравнительно чрезвычайно дорогим, а также не будет хорошо работать как параллелизируемый запрос.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12

person RussS    schedule 12.07.2015
comment
Большое тебе спасибо!! Это именно тот случай, задержка была достаточно высокой, и это не имело большого смысла для параллельных запросов. - person raschild; 12.07.2015