Преобразование UnixTimestamp в TIMEUUID для Cassandra

Я изучаю все об Apache Cassandra 3.x.x и пытаюсь разработать некоторые вещи, с которыми можно поиграть. Проблема в том, что я хочу хранить данные в таблице Cassandra, которая содержит эти столбцы:

id (UUID - Primary Key) | Message (TEXT) | REQ_Timestamp (TIMEUUID) | Now_Timestamp (TIMEUUID)

REQ_Timestamp имеет время, когда сообщение покинуло клиент на уровне внешнего интерфейса. Now_Timestamp, с другой стороны, — это время, когда сообщение наконец сохраняется в Cassandra. Мне нужны обе временные метки, потому что я хочу измерить количество времени, которое требуется для обработки запроса от его источника до безопасного сохранения данных.

Создать Now_Timestamp легко, я просто использую функцию now(), и она автоматически генерирует TIMEUUID. Проблема возникает с REQ_Timestamp. Как я могу преобразовать эту временную метку Unix в TIMEUUID, чтобы Cassandra могла ее сохранить? Это вообще возможно?

Архитектура моего бэкенда такова: я получаю данные в формате JSON из внешнего интерфейса в веб-сервис, который их обрабатывает и сохраняет в Kafka. Затем задание Spark Streaming берет этот журнал Kafka и помещает его в Cassandra.

Это мой WebService, который помещает данные в Kafka.

@Path("/")
public class MemoIn {

    @POST
    @Path("/in")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.TEXT_PLAIN)
    public Response goInKafka(InputStream incomingData){
        StringBuilder bld = new StringBuilder();
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(incomingData));
            String line = null;
            while ((line = in.readLine()) != null) {
                bld.append(line);
            }
        } catch (Exception e) {
            System.out.println("Error Parsing: - ");
        }
        System.out.println("Data Received: " + bld.toString());

        JSONObject obj = new JSONObject(bld.toString());
        String line = obj.getString("id_memo") + "|" + obj.getString("id_writer") +
                                 "|" + obj.getString("id_diseased")
                                 + "|" + obj.getString("memo") + "|" + obj.getLong("req_timestamp");

        try {
            KafkaLogWriter.addToLog(line);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return Response.status(200).entity(line).build();
    }


}

Вот мой писатель Кафка

    package main.java.vcemetery.webservice;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;

public class KafkaLogWriter {

    public static void addToLog(String memo)throws Exception {
        // private static Scanner in;
            String topicName = "MemosLog";

            /*
            First, we set the properties of the Kafka Log
             */
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            // We create the producer
            Producer<String, String> producer = new KafkaProducer<>(props);
            // We send the line into the producer
            producer.send(new ProducerRecord<>(topicName, memo));
            // We close the producer
            producer.close();

    }
}

И, наконец, вот что у меня есть о моей работе в Spark Streaming

public class MemoStream {

    public static void main(String[] args) throws Exception {
        Logger.getLogger("org").setLevel(Level.ERROR);
        Logger.getLogger("akka").setLevel(Level.ERROR);

        // Create the context with a 1 second batch size
        SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkExample").setMaster("local[2]");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        /* Se crea un array con los tópicos a consultar, en este caso solamente un tópico */
        Collection<String> topics = Arrays.asList("MemosLog");

        final JavaInputDStream<ConsumerRecord<String, String>> kafkaStream =
                KafkaUtils.createDirectStream(
                        ssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        kafkaStream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
        // Split each bucket of kafka data into memos a splitable stream
        JavaDStream<String> stream = kafkaStream.map(record -> (record.value().toString()));
        // Then, we split each stream into lines or memos
        JavaDStream<String> memos = stream.flatMap(x -> Arrays.asList(x.split("\n")).iterator());
        /*
         To split each memo into sections of ids and messages, we have to use the code \\ plus the character
          */
        JavaDStream<String> sections = memos.flatMap(y -> Arrays.asList(y.split("\\|")).iterator());
        sections.print();
        sections.foreachRDD(rdd -> {
           rdd.foreachPartition(partitionOfRecords -> {
               //We establish the connection with Cassandra
               Cluster cluster = null;
               try {
                   cluster = Cluster.builder()
                           .withClusterName("VCemeteryMemos") // ClusterName
                           .addContactPoint("127.0.0.1") // Host IP
                           .build();

               } finally {
                   if (cluster != null) cluster.close();
               }
               while(partitionOfRecords.hasNext()){


               }
           });
        });

        ssc.start();
        ssc.awaitTermination();

    }
}

Заранее спасибо.


person franpen    schedule 14.09.2017    source источник


Ответы (1)


В Cassandra нет функции преобразования из метки времени UNIX. Вы должны сделать преобразование на стороне клиента.

Ссылка: https://docs.datastax.com/en/cql/3.3/cql/cql_reference/timeuuid_functions_r.html

person Simon Fontana Oscarsson    schedule 14.09.2017
comment
На самом деле я ссылался на тот же документ, когда задавал вопрос. Любые идеи о том, как сделать преобразование на стороне клиента? Я застрял здесь. - person franpen; 14.09.2017
comment
Зависит от того, какой клиент вы используете. Это datastax javadriver? Возможно, вы можете показать часть своего кода того, что вы уже делаете. - person Simon Fontana Oscarsson; 14.09.2017
comment
Архитектура моего бэкенда такова: я получаю данные в формате JSON из внешнего интерфейса в веб-сервис, который их обрабатывает и сохраняет в Kafka. Затем задание Spark Streaming берет этот журнал Kafka и помещает его в Cassandra. Я отредактирую свой исходный пост с кодом WebService/Kafka и кодом Spark, который я написал до сих пор. - person franpen; 14.09.2017
comment
У меня нет опыта работы со Spark, но, возможно, это будет полезно: stackoverflow.com/questions/32905698/ - person Simon Fontana Oscarsson; 14.09.2017
comment
Это не совсем то, чем я хочу заниматься, но это приближается. Спасибо большое за вашу помощь! - person franpen; 14.09.2017