Государственное хранилище Kafka Stream 0.10.2.0 получает исключение при сохранении значения

Я использую API процессора низкого уровня с хранилищем состояний, поэтому до 0.10.0.1 он работает нормально, но я обновил потоки kafka, но я получаю ошибку ниже, поэтому после этого я понял, что это связано с журналом изменений, и он смотрит на контекст записи

java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
! at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150)
! at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:60)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:47)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.put(ChangeLoggingKeyValueStore.java:66)
! at     org.apache.kafka.streams.state.internals.MeteredKeyValueStore$2.run(MeteredKeyValueStore.java:67)

@Override
    public void process(String arg0, List<Data> data {
        data.forEach((x) -> {
            String rawKey = x.getId();
            Data data = kvStore.get(rawKey);
            long bytesize = data == null ? 0 : data.getVolume();
            x.addVolume(bytesize);
            kvStore.put(rawKey, x);
        });
    }

public void start() {
        builder = new KStreamBuilder();
        storeSupplier =     Stores.create(getKVStoreName()).withKeys(getProcessorKeySerde()).withValues(getProcessorValueSerde()).persistent().build();
        builder.addStateStore(storeSupplier);
        stream = builder.stream(Serdes.String(), serde(),getTopicName());
        processStream(stream);
        streams = new KafkaStreams(builder, props);
        streams.cleanUp();
        streams.start();
    }

    @Override
    public void init(ProcessorContext context) {
        super.init(context);
        this.context = context;
        this.context.schedule(timeinterval);
        this.kvStore = (KeyValueStore) context.getStateStore(getKVStoreName());
    }

person Bharateshwar Patil    schedule 18.05.2017    source источник
comment
Если я отключил журнал изменений, эта проблема не возникла. Для Fail over это нехорошо :(   -  person Bharateshwar Patil    schedule 18.05.2017
comment
Вы можете поделиться своим кодом?   -  person Matthias J. Sax    schedule 18.05.2017
comment
@Override public void process (String arg0, List ‹Data› data {data.forEach ((x) - ›{String rawKey = x.getId (); Data data = kvStore.get (rawKey); long bytesize = val == null? 0: data.getVolume (); x.addVolume (bytesize); kvStore.put (rawKey, x);});}   -  person Bharateshwar Patil    schedule 19.05.2017
comment
Сложно сказать ... Вы пробовали 0.10.2.1 - там исправлено несколько ошибок. Ваши брокеры тоже на 0.10.2?   -  person Matthias J. Sax    schedule 19.05.2017
comment
да, мой брокер тоже на 0.10.2.0   -  person Bharateshwar Patil    schedule 29.05.2017
comment
Вы можете обновить свое приложение, не обновляя брокера. Я бы попробовал. (см. docs.confluent.io/current/streams/ )   -  person Matthias J. Sax    schedule 30.05.2017


Ответы (1)


Подобные исключения могут возникнуть при использовании одного и того же экземпляра Processor в нескольких потоках или разделах.

Убедитесь, что вы возвращаете новый экземпляр в ProcessorSupplier:

new ProcesorSupplier(() -> new Processor(...

То же самое относится и к Transformer и TransformerSupplier.

person amcc    schedule 03.07.2017