Hazelcast Write Behind с использованием Cassandra

Я использую Cassandra в качестве постоянного хранилища вместе с Hazelcast 3.6, для остального API (POST). Реализация Mapstore показана ниже.

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.hazelcast.core.MapStore;

public class InventoryMapStoreImpl implements MapStore<String, Map<String, Integer>> {

    public static Cluster cluster;
    public static Session session;

    public InventoryMapStoreImpl() {
        cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
        session = cluster.connect("company");
    }

    @Override
    public synchronized Map<String, Integer>  load(String key) {

        Map<String, Integer> qty = new HashMap<String, Integer>();
        ResultSetFuture futureList = session
                .executeAsync("Select * from company.onhandinventoryavailability WHERE key='" + key + "';");
        try {

            if (futureList.get() != null) {
                ResultSet rsFuture = futureList.get();
                Row resultOne = rsFuture.one();
                if (resultOne != null) {
                    qty = resultOne.getMap(1, String.class, Integer.class);
                }
            }
            session.close();
            cluster.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return qty;
    }

    @Override
    public Map<String, Map<String, Integer>> loadAll(Collection<String> keys) {
        return null;
    }

    @Override
    public Iterable<String> loadAllKeys() {
        return null;
    }

    @Override
    public synchronized void store(String key, Map<String, Integer> value) {

        try {

            Insert insert = QueryBuilder.insertInto("onhandinventoryavailability")
                    .value("key", key);
              insert.value("value", value);
              session.execute(insert);
            System.out.println("INSERTED:"+key+" INTO CASSANDRA...");
        } catch (Exception e) {
            System.out.println("ERORRRR");
            e.printStackTrace();
        }
        session.close();
        cluster.close();
    }

    @Override
    public void storeAll(Map<String, Map<String, Integer>> map) {
    }

    @Override
    public void delete(String key) {
    }

    @Override
    public void deleteAll(Collection<String> keys) {
    }
}

И запись hazelcast xml для карты.

map name="onHandInventoryAvailability">
                <map-store enabled="true">
                        <class-name>com.company.common.hazelcast.mapstore.InventoryMapStoreImpl</class-name>
                        <write-delay-seconds>5</write-delay-seconds>
                        <write-batch-size>1000</write-batch-size>
                        <write-coalescing>true</write-coalescing>
                </map-store>
                <in-memory-format>BINARY</in-memory-format>
                <backup-count>1</backup-count>
                <async-backup-count>0</async-backup-count>
                <time-to-live-seconds>0</time-to-live-seconds>
                <max-idle-seconds>0</max-idle-seconds>
                <eviction-policy>NONE</eviction-policy>
                <max-size policy="PER_NODE">0</max-size>
                <eviction-percentage>25</eviction-percentage>
                <min-eviction-check-millis>100</min-eviction-check-millis>
                <merge-policy>com.hazelcast.map.merge.PutIfAbsentMapMergePolicy</merge-policy>
                <cache-deserialized-values>INDEX-ONLY</cache-deserialized-values>
         </map>

Проблема в том, что когда я нажимаю POST api после задержки или по одному, данные поступают как в hazelcast, так и в cassandra, но когда я делаю больше POST, скажем, 10,100 и т. Д., Соединение с cassandra теряется из-за hazelcast, а запись за doen ' t работают должным образом. Только одна или две записи из 10 100 и т. д. идут cassandra (не все). Что-то не так в моей реализации mapstore?


person user2966021    schedule 07.06.2016    source источник
comment
Вы вызываете session.close() и cluster.close() для каждого метода, но генерируете его только один раз - ни разу не использовал клиент Cassandra, что выглядит подозрительно.   -  person noctarius    schedule 07.06.2016
comment
Я пробовал эту штуку, не закрывая сессию и кластер, но проблема с отключением кассандры все еще существует.   -  person user2966021    schedule 07.06.2016
comment
Думаю, это больше по проблеме с Кассадрой :) Извините   -  person noctarius    schedule 07.06.2016
comment
пожалуйста, добавьте репозиторий github   -  person nitinsridar    schedule 23.09.2019


Ответы (1)


Hazelcast создаст экземпляр вашей реализации MapStore только один раз и вызовет методы load, store и т. Д. В этом экземпляре. Это означает, что вы подключаетесь к кластеру Cassandra и открываете сеанс только один раз (в конструкторе MapStore), а когда метод load или store вызывается только один раз, вы отключаетесь от кластера Cassandra. Это объясняет текущее поведение вашего кода.

Объекты Cluster & Session драйвера Cassandra должны охватывать все время существования вашего приложения [1]. Как уже было предложено в комментариях, вы не должны закрывать их, пока ваше приложение не будет закрыто.

[1] http://docs.datastax.com/en/developer/java-driver/2.0/java-driver/fourSimpleRules.html.

person Vassilis Bekiaris    schedule 23.06.2016