Scala - API мониторинга Flink (загрузка заданий)

Добрый день, у меня проблема с загрузкой вакансий в Flink API с помощью Scala

Все запросы Get работают

import scalaj.http._ 
val url: String = "http://127.0.0.1:8081"
val response: HttpResponse[String] = Http(url+"/config").asString
return response

Когда я пытаюсь загрузить файл JAR через CURL (работает)

curl -vvv -X POST -H "Expect:" -F "jarfile=@/home/Downloads/myJob.jar" http://127.0.0.1:8081/jars/upload

Теперь я хотел бы загрузить с помощью SCALA. В документации нет рабочего примера, и я новичок в этом типе сообщений: https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#submitting-programs

В настоящее время мой код (не работает): Взято из: https://github.com/Guru107/flinkjobuploadplugin/tree/master/src/main/java/com/github/guru107 - отредактировано к моим потребностям

// Ideal Case is to upload a Jar File as a multipart in Scala
  import java.io.IOException
  import org.apache.http.client.methods.HttpPost
  import org.apache.http.entity.mime.MultipartEntityBuilder
  import org.apache.http.impl.client.{HttpClients, LaxRedirectStrategy}
  import org.apache.http.message.BasicHeader
  import org.apache.http.util.EntityUtils


  val requestUrl = "http://localhost:8081/jars/upload"
  val jarPath = "@/home/Downloads/myJob.jar"

  val httpClient: CloseableHttpClient = HttpClients.custom.setRedirectStrategy(new LaxRedirectStrategy).build

  val fileToUpload: File = new File(jarPath)
  val uploadFileUrl: HttpPost = new HttpPost(requestUrl)

  val builder: MultipartEntityBuilder = MultipartEntityBuilder.create
  builder.addBinaryBody("jarfile", fileToUpload)

  val multipart: HttpEntity = builder.build
  var jobUploadResponse: JSONObject = null

  uploadFileUrl.setEntity(multipart)
  var response: CloseableHttpResponse  = null
  try {
    response = httpClient.execute(uploadFileUrl)
    println("response: " + response)
    response.setHeader(new BasicHeader("Expect", ""))
    response.setHeader(new BasicHeader("content-type", "application/x-java-archive"))
    val bodyAsString = EntityUtils.toString(response.getEntity, "UTF-8")
    println("bodyAsString: " + bodyAsString)
    jobUploadResponse = new JSONObject(bodyAsString)
    println("jobUploadResponse: " + jobUploadResponse)
  }

Не удается загрузить файл.

Пожалуйста, предоставьте рабочий пример или ссылку на пример scala для загрузки файла задания / jar для мигания в scala

Заранее спасибо


person Sub Zero    schedule 17.10.2017    source источник


Ответы (1)


Вы можете использовать клиентский код из com.github.mjreid.flinkwrapper

И загрузите файл jar с кодом Scala:

val apiEndpoint: String = as.settings.config.getString("flink.url") //http://<flink_web_host>:<flink_web_port>
val client = FlinkRestClient(apiEndpoint, as)
client.runProgram(<jarId>)
person enrique-carbonell    schedule 12.09.2018