Пакетная обработка набора данных Spark scala

Я пытаюсь создать партии строк Dataset в Spark. Для поддержания количества записей, отправленных в службу, я хочу группировать элементы, чтобы я мог поддерживать скорость, с которой будут отправляться данные. За,

case class Person(name:String, address: String)
case class PersonBatch(personBatch: List[Person])

Для данного Dataset[Person] я хочу создать Dataset[PersonBatch]

Например, если вход Dataset[Person] содержит 100 записей, вывод Dataset должен быть похож на Dataset[PersonBatch], где каждый PersonBatchдолжен быть списком из n записей (человек).

Я пробовал это, но это не сработало.

object DataBatcher extends Logger {

  var batchList: ListBuffer[PersonBatch] = ListBuffer[PersonBatch]()
  var batchSize: Long = 500  //default batch size

  def addToBatchList(batch: PersonBatch): Unit = {
    batchList += batch
  }

  def clearBatchList(): Unit = {
    batchList.clear()
  }

  def createBatches(ds: Dataset[Person]): Dataset[PersonBatch] = {

    val dsCount = ds.count()
    logger.info(s"Count of dataset passed for creating batches : ${dsCount}")
    val batchElement = ListBuffer[Person]()
    val batch = PersonBatch(batchElement)
    ds.foreach(x => {
      batch.personBatch += x
      if(batch.personBatch.length == batchSize) {
        addToBatchList(batch)
        batch.requestBatch.clear()
      }
    })
    if(batch.personBatch.length > 0) {
      addToBatchList(batch)
      batch.personBatch.clear()
    }
    sparkSession.createDataset(batchList)
  }  
}

Я хочу запустить это задание в кластере Hadoop. Может ли кто-нибудь помочь мне с этим?


person Abhishek Tripathi    schedule 16.11.2017    source источник


Ответы (1)


rdd.iterator имеет сгруппированную функцию, которая может быть вам полезна.

Например :

iter.grouped (размер партии)

Пример фрагмента кода, который выполняет пакетную вставку с iter.grouped(batchsize) здесь его 1000, и я пытаюсь вставить в базу данных

   df.repartition(numofpartitionsyouwant) // numPartitions ~ number of simultaneous DB connections you can planning to give...
def insertToTable(sqlDatabaseConnectionString: String,
                  sqlTableName: String): Unit = {

  val tableHeader: String = dataFrame.columns.mkString(",")
  dataFrame.foreachPartition { partition =>
    //NOTE : EACH PARTITION ONE CONNECTION (more better way is to use connection pools)
    val sqlExecutorConnection: Connection =
      DriverManager.getConnection(sqlDatabaseConnectionString)
    //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
    partition.grouped(1000).foreach { group =>
      val insertString: scala.collection.mutable.StringBuilder =
        new scala.collection.mutable.StringBuilder()

      group.foreach { record =>
        insertString.append("('" + record.mkString(",") + "'),")
      }

      sqlExecutorConnection
        .createStatement()
        .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
          + insertString.stripSuffix(","))
    }

    sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
  }
}
person Ram Ghadiyaram    schedule 16.11.2017