Примеры Dataproc + BigQuery - есть ли в наличии?

Согласно docos Dataproc, он имеет «встроенную и автоматическую интеграцию с BigQuery ".

У меня есть таблица в BigQuery. Я хочу прочитать эту таблицу и выполнить некоторый анализ, используя созданный мною кластер Dataproc (используя задание PySpark). Затем запишите результаты этого анализа обратно в BigQuery. Вы можете спросить: «Почему бы просто не провести анализ напрямую в BigQuery !?» - причина в том, что мы создаем сложные статистические модели, а SQL слишком высок для их разработки. Нам нужно что-то вроде Python или R, следовательно, Dataproc.

Доступны ли какие-либо примеры Dataproc + BigQuery? Я ничего не могу найти.


person Graham Polley    schedule 06.10.2015    source источник


Ответы (2)


Для начала, как указано в этом вопросе, коннектор BigQuery предварительно установлен на Cloud Dataproc.

Вот пример того, как читать данные из BigQuery в Spark. В этом примере мы будем читать данные из BigQuery для подсчета слов. Вы читаете данные из BigQuery в Spark, используя SparkContext.newAPIHadoopRDD. В документации Spark есть больше информации об использовании SparkContext.newAPIHadoopRDD. '

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat
import com.google.gson.JsonObject

import org.apache.hadoop.io.LongWritable

val projectId = "<your-project-id>"
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val fullyQualifiedOutputTableId = "<your-fully-qualified-table-id>"
val outputTableSchema =
    "[{'name': 'Word','type': 'STRING'},{'name': 'Count','type': 'INTEGER'}]"
val jobName = "wordcount"

val conf = sc.hadoopConfiguration

// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)

// Use the systemBucket for temporary BigQuery export data used by the InputFormat.
val systemBucket = conf.get("fs.gs.system.bucket")
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket)

// Configure input and output for BigQuery access.
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
BigQueryConfiguration.configureBigQueryOutput(conf,
    fullyQualifiedOutputTableId, outputTableSchema)

val fieldName = "word"

val tableData = sc.newAPIHadoopRDD(conf,
    classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject])
tableData.cache()
tableData.count()
tableData.map(entry => (entry._1.toString(),entry._2.toString())).take(10)

Вам нужно будет настроить этот пример со своими настройками, включая идентификатор проекта Cloud Platform в <your-project-id> и идентификатор выходной таблицы в <your-fully-qualified-table-id>.

Наконец, если вы в конечном итоге используете коннектор BigQuery с MapReduce, эта страница есть примеры написания заданий MapReduce с помощью коннектора BigQuery.

person James    schedule 09.10.2015
comment
есть ли какая-нибудь версия этого C #? - person PUG; 25.02.2016
comment
Я просто хочу отметить, что в полной документации есть также объяснение того, как выполнить очистку, иначе промежуточные файлы экспорта останутся в GCS. - person Dennis Huo; 24.03.2016


В приведенном выше примере не показано, как записывать данные в выходную таблицу. Вам нужно сделать это:

.saveAsNewAPIHadoopFile(
hadoopConf.get(BigQueryConfiguration.TEMP_GCS_PATH_KEY), 
classOf[String], 
classOf[JsonObject], 
classOf[BigQueryOutputFormat[String, JsonObject]], hadoopConf)

где ключ: String фактически игнорируется

person lukeforehand    schedule 02.11.2015