Я пытаюсь создать партии строк Dataset
в Spark. Для поддержания количества записей, отправленных в службу, я хочу группировать элементы, чтобы я мог поддерживать скорость, с которой будут отправляться данные. За,
case class Person(name:String, address: String)
case class PersonBatch(personBatch: List[Person])
Для данного Dataset[Person]
я хочу создать Dataset[PersonBatch]
Например, если вход Dataset[Person]
содержит 100 записей, вывод Dataset
должен быть похож на Dataset[PersonBatch]
, где каждый PersonBatch
должен быть списком из n
записей (человек).
Я пробовал это, но это не сработало.
object DataBatcher extends Logger {
var batchList: ListBuffer[PersonBatch] = ListBuffer[PersonBatch]()
var batchSize: Long = 500 //default batch size
def addToBatchList(batch: PersonBatch): Unit = {
batchList += batch
}
def clearBatchList(): Unit = {
batchList.clear()
}
def createBatches(ds: Dataset[Person]): Dataset[PersonBatch] = {
val dsCount = ds.count()
logger.info(s"Count of dataset passed for creating batches : ${dsCount}")
val batchElement = ListBuffer[Person]()
val batch = PersonBatch(batchElement)
ds.foreach(x => {
batch.personBatch += x
if(batch.personBatch.length == batchSize) {
addToBatchList(batch)
batch.requestBatch.clear()
}
})
if(batch.personBatch.length > 0) {
addToBatchList(batch)
batch.personBatch.clear()
}
sparkSession.createDataset(batchList)
}
}
Я хочу запустить это задание в кластере Hadoop. Может ли кто-нибудь помочь мне с этим?