Зачем использовать структуру сериализации Kryo в apache storm, будет перезаписывать данные, когда блот получает значения

Возможно, в основном разработка использовала AVRO в качестве среды сериализации в схеме Kafka и Apache Storm. Но мне нужно обрабатывать самые сложные данные, и я обнаружил, что структура сериализации Kryo также успешно интегрируется в наш проект. которые следуют среде Kafka и Apache Storm. Но когда захотелось дальнейшей эксплуатации там был странный статус.

Я отправил сообщение Кафке 5 раз, задание Storm также может прочитать 5 сообщений и успешно десериализовать. Но следующее пятно получает неверное значение данных. Там выведите то же значение, что и в последнем сообщении. Затем мне пришлось добавить распечатку после завершения кода десериализации. На самом деле это распечатка, там было 5 разных сообщений. Почему следующее пятно не может значения? Смотрите мой код ниже:

KryoScheme.java

public abstract class KryoScheme<T> implements Scheme {

private static final long serialVersionUID = 6923985190833960706L;

private static final Logger logger = LoggerFactory.getLogger(KryoScheme.class);

private Class<T> clazz;
private Serializer<T> serializer;

public KryoScheme(Class<T> clazz, Serializer<T> serializer) {
    this.clazz = clazz;
    this.serializer = serializer;
}

@Override
public List<Object> deserialize(byte[] buffer) {
    Kryo kryo = new Kryo();
    kryo.register(clazz, serializer);
    T scheme = null;
    try {
        scheme = kryo.readObject(new Input(new ByteArrayInputStream(buffer)), this.clazz);
        logger.info("{}", scheme);
    } catch (Exception e) {
        String errMsg = String.format("Kryo Scheme failed to deserialize data from Kafka to %s. Raw: %s",
                clazz.getName(), 
                new String(buffer));
        logger.error(errMsg, e);
        throw new FailedException(errMsg, e);
    }

    return new Values(scheme);
}}

PrintFunction.java

public class PrintFunction extends BaseFunction {

private static final Logger logger = LoggerFactory.getLogger(PrintFunction.class);

@Override
public void execute(TridentTuple tuple, TridentCollector collector) {

    List<Object> data = tuple.getValues();

    if (data != null) {
        logger.info("Scheme data size: {}", data.size());
        for (Object value : data) {
            PrintOut out = (PrintOut) value;
            logger.info("{}.{}--value: {}",
                    Thread.currentThread().getName(),
                    Thread.currentThread().getId(),
                    out.toString());

            collector.emit(new Values(out));
        }
    }

}}

StormLocalTopology.java

public class StormLocalTopology {

public static void main(String[] args) {

    ........

    BrokerHosts zk = new ZkHosts("xxxxxx");
    Config stormConf = new Config();
    stormConf.put(Config.TOPOLOGY_DEBUG, false);
    stormConf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 1000 * 5);
    stormConf.put(Config.TOPOLOGY_WORKERS, 1);
    stormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 5);
    stormConf.put(Config.TOPOLOGY_TASKS, 1);

    TridentKafkaConfig actSpoutConf = new TridentKafkaConfig(zk, topic);
    actSpoutConf.fetchSizeBytes =  5 * 1024 * 1024 ;
    actSpoutConf.bufferSizeBytes = 5 * 1024 * 1024 ;
    actSpoutConf.scheme = new SchemeAsMultiScheme(scheme);

    actSpoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

    TridentTopology topology = new TridentTopology();
    TransactionalTridentKafkaSpout actSpout = new TransactionalTridentKafkaSpout(actSpoutConf);

    topology.newStream(topic, actSpout).parallelismHint(4).shuffle()
            .each(new Fields("act"), new PrintFunction(), new Fields());

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology(topic+"Topology", stormConf,  topology.build());
}}

Есть и другая проблема, почему схема крио может прочитать только один буфер сообщений. Есть ли другой способ получить буфер с несколькими сообщениями, а затем можно пакетно отправлять данные в следующее пятно.

Также, если я отправлю 1 сообщение, полный поток кажется успешным.

Затем отправить 2 сообщения неправильно. распечатайте сообщение, как показано ниже:

56157 [Thread-18-spout0] INFO  s.s.a.s.s.c.KryoScheme - 2016-02-   05T17:20:48.122+0800,T6mdfEW@N5pEtNBW
56160 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Scheme data size: 1
56160 [Thread-18-spout0] INFO  s.s.a.s.s.c.KryoScheme - 2016-02-    05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56161 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56162 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Scheme data size: 1
56162 [Thread-20-b-0] INFO  s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8

person elkan1788    schedule 05.02.2016    source источник


Ответы (1)


Я сожалею, что это моя ошибка. Только что обнаружил ошибку в классе десериализации Kryo, существует параметр локальной области видимости, поэтому он может перезаписываться в многопоточной среде. Не изменяйте параметр в партийной области, код работает хорошо.

справочный код см. удар:

public class KryoSerializer<T extends BasicEvent> extends Serializer<T> implements Serializable {

private static final long serialVersionUID = -4684340809824908270L;

// It's wrong set

//private T event; 

public KryoSerializer(T event) {
    this.event = event;
}

@Override
public void write(Kryo kryo, Output output, T event) {
    event.write(output);
}

@Override
public T read(Kryo kryo, Input input, Class<T> type) {
    T event = new T();
    event.read(input);
    return event;
}
}
person elkan1788    schedule 18.02.2016