Apache Spark — необходимо динамически реализовать withColumn на основе результирующей карты метаданных.

У меня есть вариант использования, когда мне нужно внести некоторые исправления в уже загруженные данные. Учитывая, что метаданные находятся в hbase с логическими первичными ключами в одном семействе столбцов и columnUpdates в другом семействе столбцов. Предположим, что у меня есть отфильтрованный кадр данных с одной записью, для которой мне нужно выполнить обновление (отфильтровано с помощью искры sql). colNames и значения Colvalues ​​находятся на карте java. Я знал, что мы можем применить withColunmm для обновления или добавления новый столбец для существующего DF, но в этом случае мне нужно применить withColumn несколько раз на основе моих метаданных, т. е. количество столбцов, для которых необходимо исправить данные. я не могу сделать это в цикле for, повторяя map, поскольку кадры данных являются неизменяемыми, а также мне не рекомендуется использовать случай переключения. Также есть ограничение, которое не должно использовать scala API.

Dataset<Row> existingdata = sparksession.read
      .format(com.databricks.spark.avro)
      .load(myhdfslocation);

Map<byte[],byte[]> colUpdates = result.getFamily("TK")//result of hbase get
Set<byte[]> colUpdateKeys = colUpdates.keySet();
for(byte[] eachkey : colUpdateKeys ){
    Dataset<Row> updatedDF =  
             existingdata.withColumn(
                existingdata.col(Bytes.toString(eachkey)),
                "value from themetadatamap"
             );
}

Пока у меня есть 2 подхода: один использует случай переключения (что не оптимально, поскольку это не очень хороший способ сохранить так много случаев переключения), а другой читает метаданные hbase как искровой фрейм данных, а затем применяет искровые соединения. чтобы получить результирующий набор данных. Если кто-нибудь может предложить лучший способ реализовать этот вариант использования, это будет действительно полезно. :)


person Dijendra Pushadapu    schedule 21.04.2018    source источник
comment
Приходит ли "value from themetadatamap" из того же фрейма данных existingdata?   -  person ernest_k    schedule 21.04.2018
comment
Нет. Это значение из карты colupdates. Эта карта имеет имя столбца, которое должно быть обновлено как ключ, а значение — это значение столбца.   -  person Dijendra Pushadapu    schedule 21.04.2018
comment
Existingdata =existingdata.withColumn(.....) Эта строка вызовет ошибку повторной инициализации, поскольку фреймы данных неизменяемы, если фрейм данных инициализируется после того, как его нельзя изменить для нового значения.   -  person Dijendra Pushadapu    schedule 21.04.2018
comment
Фреймы данных неизменяемы, это означает, что вы не можете изменить объект фрейма данных. Это не означает, что вы не можете заменить значение объекта переменной фрейма данных другим объектом. existingdata = existingdata.withColumn() создаст новый фрейм данных на основе старого, а затем перезапишет значение переменной.   -  person ernest_k    schedule 21.04.2018
comment
Даже я думал аналогичным образом и сохранил аналогичный код, но во время выполнения он выдает ошибку повторной инициализации. Вы можете попробовать искровую оболочку, выполнив аналогичный код. Он не позволяет даже изменить ссылочную переменную. ..в любом случае я снова побегу и дам вам знать.   -  person Dijendra Pushadapu    schedule 22.04.2018
comment
как насчет чего-то подобного, хотя это код scala... val updatedDF = colUpdateKeys.foldLeft(existingdata)((df, name) => df.withColumn(name, «value from themetadatamap»))   -  person Sudheer Palyam    schedule 24.04.2018
comment
Tq @Эрнест Кивеле. Возможность изменить ссылочную переменную фрейма данных с помощью кода. С тем же фрагментом кода, который я не смог сделать ранее (до публикации здесь), bcz из-за конфликтов некоторых зависимостей jar. Другое изменение было внутри withColumn , я добавил второй аргумент как буквальный Obj. Теперь он работает, как ожидалось. Тк все. :)   -  person Dijendra Pushadapu    schedule 24.04.2018
comment
@DijendraPushadapu Приятно знать, спасибо!   -  person ernest_k    schedule 24.04.2018


Ответы (1)


Хотя вы не можете изменить фрейм данных, вы можете обновить переменную, ссылающуюся на него :-)

for(byte[] eachkey : colUpdateKeys ){
    existingdata =  
             existingdata.withColumn(
                existingdata.col(Bytes.toString(eachkey)),
                "value from themetadatamap"
             );
}

Это перезапишет переменную existingdata значением последнего фрейма данных в линейке, так что после цикла existingdata будет иметь все столбцы.

person ernest_k    schedule 21.04.2018