Я использую Cassandra в качестве постоянного хранилища вместе с Hazelcast 3.6, для остального API (POST). Реализация Mapstore показана ниже.
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.hazelcast.core.MapStore;
public class InventoryMapStoreImpl implements MapStore<String, Map<String, Integer>> {
public static Cluster cluster;
public static Session session;
public InventoryMapStoreImpl() {
cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
session = cluster.connect("company");
}
@Override
public synchronized Map<String, Integer> load(String key) {
Map<String, Integer> qty = new HashMap<String, Integer>();
ResultSetFuture futureList = session
.executeAsync("Select * from company.onhandinventoryavailability WHERE key='" + key + "';");
try {
if (futureList.get() != null) {
ResultSet rsFuture = futureList.get();
Row resultOne = rsFuture.one();
if (resultOne != null) {
qty = resultOne.getMap(1, String.class, Integer.class);
}
}
session.close();
cluster.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return qty;
}
@Override
public Map<String, Map<String, Integer>> loadAll(Collection<String> keys) {
return null;
}
@Override
public Iterable<String> loadAllKeys() {
return null;
}
@Override
public synchronized void store(String key, Map<String, Integer> value) {
try {
Insert insert = QueryBuilder.insertInto("onhandinventoryavailability")
.value("key", key);
insert.value("value", value);
session.execute(insert);
System.out.println("INSERTED:"+key+" INTO CASSANDRA...");
} catch (Exception e) {
System.out.println("ERORRRR");
e.printStackTrace();
}
session.close();
cluster.close();
}
@Override
public void storeAll(Map<String, Map<String, Integer>> map) {
}
@Override
public void delete(String key) {
}
@Override
public void deleteAll(Collection<String> keys) {
}
}
И запись hazelcast xml для карты.
map name="onHandInventoryAvailability">
<map-store enabled="true">
<class-name>com.company.common.hazelcast.mapstore.InventoryMapStoreImpl</class-name>
<write-delay-seconds>5</write-delay-seconds>
<write-batch-size>1000</write-batch-size>
<write-coalescing>true</write-coalescing>
</map-store>
<in-memory-format>BINARY</in-memory-format>
<backup-count>1</backup-count>
<async-backup-count>0</async-backup-count>
<time-to-live-seconds>0</time-to-live-seconds>
<max-idle-seconds>0</max-idle-seconds>
<eviction-policy>NONE</eviction-policy>
<max-size policy="PER_NODE">0</max-size>
<eviction-percentage>25</eviction-percentage>
<min-eviction-check-millis>100</min-eviction-check-millis>
<merge-policy>com.hazelcast.map.merge.PutIfAbsentMapMergePolicy</merge-policy>
<cache-deserialized-values>INDEX-ONLY</cache-deserialized-values>
</map>
Проблема в том, что когда я нажимаю POST api после задержки или по одному, данные поступают как в hazelcast, так и в cassandra, но когда я делаю больше POST, скажем, 10,100 и т. Д., Соединение с cassandra теряется из-за hazelcast, а запись за doen ' t работают должным образом. Только одна или две записи из 10 100 и т. д. идут cassandra (не все). Что-то не так в моей реализации mapstore?
session.close()
иcluster.close()
для каждого метода, но генерируете его только один раз - ни разу не использовал клиент Cassandra, что выглядит подозрительно. - person noctarius   schedule 07.06.2016