У меня есть сценарий, в котором я должен поддерживать карту, которая может быть заполнена несколькими потоками, каждый из которых изменяет там соответствующий список (уникальный идентификатор/ключ, являющийся именем потока), и когда размер списка для потока превышает фиксированный размер пакета, мы должны сохранять записи в БД.
Пример кода ниже:
private volatile ConcurrentHashMap<String, List<T>> instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReadWriteLock lock ;
public void addAll(List<T> entityList, String threadName) {
try {
lock.readLock().lock();
List<T> instrumentList = instrumentMap.get(threadName);
if(instrumentList == null) {
instrumentList = new ArrayList<T>(batchSize);
instrumentMap.put(threadName, instrumentList);
}
if(instrumentList.size() >= batchSize -1){
instrumentList.addAll(entityList);
recordSaver.persist(instrumentList);
instrumentList.clear();
} else {
instrumentList.addAll(entityList);
}
} finally {
lock.readLock().unlock();
}
}
Через каждые 2 минуты запускается еще один отдельный поток для сохранения всех записей в карте (чтобы убедиться, что у нас что-то сохраняется каждые 2 минуты и размер карты не становится слишком большим), и когда он запускается, он блокирует все остальные потоки (проверьте используются readLock и writeLock, где writeLock имеет более высокий приоритет)
if(//Some condition) {
Thread.sleep(//2 minutes);
aggregator.getLock().writeLock().lock();
List<T> instrumentList = instrumentMap .values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
if(instrumentList.size() > 0) {
saver.persist(instrumentList);
instrumentMap .values().parallelStream().forEach(x -> x.clear());
aggregator.getLock().writeLock().unlock();
}
Это решение работает нормально почти для каждого сценария, который мы тестировали, за исключением того, что иногда мы видим, что некоторые записи пропали без вести, то есть вообще не сохраняются, хотя они были добавлены в карту.
У меня вопрос, в чем проблема с этим кодом? Разве ConcurrentHashMap не лучшее решение? Есть ли здесь проблемы с использованием блокировки чтения/записи? Должен ли я перейти к последовательной обработке?