Я использую слияние для импорта данных из kafka в hive, пытаясь сделать то же самое: Записи сегмента на основе времени (kafka-hdfs-connector)
мой конфиг стока такой:
{
"name":"yangfeiran_hive_sink_9",
"config":{
"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"topics":"peoplet_people_1000",
"name":"yangfeiran_hive_sink_9",
"tasks.max":"1",
"hdfs.url":"hdfs://master:8020",
"flush.size":"3",
"partitioner.class":"io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner",
"partition.duration.ms":"300000",
"path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm/",
"locale":"en",
"logs.dir":"/tmp/yangfeiran",
"topics.dir":"/tmp/yangfeiran",
"hive.integration":"true",
"hive.metastore.uris":"thrift://master:9083",
"schema.compatibility":"BACKWARD",
"hive.database":"yangfeiran",
"timezone": "UTC",
}
}
Все работает нормально, я вижу, что данные находятся в hdfs, таблица создается в улье, за исключением случаев, когда я использую «выбрать * из ян», чтобы проверить, есть ли данные уже в улье.
Он печатает ошибку:
FAILED: SemanticException Невозможно определить, зашифрован ли hdfs://master:8020/tmp/yangfeiran/people_people_1000: java.lang.IllegalArgumentException: Wrong FS: hdfs://master:8020/tmp/yangfeiran/peoplet_people_1000, ожидается: hdfs: //nsstargate
Как решить эту проблему?
Фейран