Является ли следующий код потокобезопасным

У меня есть сценарий, в котором я должен поддерживать карту, которая может быть заполнена несколькими потоками, каждый из которых изменяет там соответствующий список (уникальный идентификатор/ключ, являющийся именем потока), и когда размер списка для потока превышает фиксированный размер пакета, мы должны сохранять записи в БД.

Пример кода ниже:

private volatile ConcurrentHashMap<String, List<T>>  instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReadWriteLock lock ;

public void addAll(List<T> entityList, String threadName) {
    try {
        lock.readLock().lock();
        List<T> instrumentList = instrumentMap.get(threadName);
        if(instrumentList == null) {
            instrumentList = new ArrayList<T>(batchSize);
            instrumentMap.put(threadName, instrumentList);
        }

        if(instrumentList.size() >= batchSize -1){
            instrumentList.addAll(entityList);
            recordSaver.persist(instrumentList); 
            instrumentList.clear();
        } else {
            instrumentList.addAll(entityList);  
        }
    } finally {
        lock.readLock().unlock();
    }

}

Через каждые 2 минуты запускается еще один отдельный поток для сохранения всех записей в карте (чтобы убедиться, что у нас что-то сохраняется каждые 2 минуты и размер карты не становится слишком большим), и когда он запускается, он блокирует все остальные потоки (проверьте используются readLock и writeLock, где writeLock имеет более высокий приоритет)

if(//Some condition) {
                    Thread.sleep(//2 minutes);
                    aggregator.getLock().writeLock().lock();
                    List<T> instrumentList = instrumentMap .values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
                    if(instrumentList.size() > 0) {

                        saver.persist(instrumentList);
                        instrumentMap .values().parallelStream().forEach(x -> x.clear());
                    aggregator.getLock().writeLock().unlock();
                }

Это решение работает нормально почти для каждого сценария, который мы тестировали, за исключением того, что иногда мы видим, что некоторые записи пропали без вести, то есть вообще не сохраняются, хотя они были добавлены в карту.

У меня вопрос, в чем проблема с этим кодом? Разве ConcurrentHashMap не лучшее решение? Есть ли здесь проблемы с использованием блокировки чтения/записи? Должен ли я перейти к последовательной обработке?


person Amit    schedule 07.08.2018    source источник
comment
Подождите, вы сохраняете данные под блокировкой read? Как вы думаете, может быть, блокировка записи была бы правильной?   -  person Andy Turner    schedule 07.08.2018
comment
Если вы используете блокировку и параллельный HashMap, вы часто делаете что-то не так.   -  person Andy Turner    schedule 07.08.2018
comment
@Энди Тернер, в чем проблема с блокировкой чтения, когда каждый поток изменяет/сохраняет свой собственный список?   -  person Amit    schedule 07.08.2018
comment
Каково ваше предложение здесь тогда? Должен ли я использовать только блокировку записи? Я не могу использовать здесь Normal HashMap   -  person Amit    schedule 07.08.2018
comment
@AndyTurner предложенное вами решение хорошее. Но есть еще одна часть проблемы, как я уже упоминал, то есть «еще один отдельный поток, запускаемый каждые 2 минуты», теперь этот поток использует writeLock, который, безусловно, отличается от вычислений атомарных операций (блокировка, используемая внутри вычислений). Что вы предлагаете, я должен использовать блокировку записи для обоих со справедливостью?   -  person Amit    schedule 10.08.2018


Ответы (1)


Нет, это не потокобезопасно.

Проблема в том, что вы используете блокировку read для ReadWriteLock. Это не гарантирует эксклюзивный доступ для внесения обновлений. Для этого вам нужно использовать блокировку write.

Но вам вообще не нужно использовать отдельный замок. Вы можете просто использовать метод ConcurrentHashMap.compute:

instrumentMap.compute(threadName, (tn, instrumentList) -> {
  if (instrumentList == null) {
    instrumentList = new ArrayList<>();
  }

  if(instrumentList.size() >= batchSize -1) {
    instrumentList.addAll(entityList); 
    recordSaver.persist(instrumentList); 
    instrumentList.clear();
  } else {
    instrumentList.addAll(entityList);
  }

  return instrumentList;
});

Это позволяет вам обновлять элементы в списке, а также гарантирует эксклюзивный доступ к списку для данного ключа.

Я подозреваю, что вы могли бы разделить вызов compute на computeIfAbsent (для добавления списка, если его там нет), а затем computeIfPresent (для обновления/сохранения списка): атомарность этих двух операций здесь не нужна. Но нет смысла их разделять.


Кроме того, instrumentMap почти наверняка не должен быть изменчивым. Если вы действительно не хотите переназначать его значение (учитывая этот код, я в этом сомневаюсь), удалите volatile и сделайте его окончательным.

Точно так же сомнительны и нефинальные блокировки. Если вы придерживаетесь использования блокировки, сделайте это также окончательным.

person Andy Turner    schedule 07.08.2018
comment
Спасибо за помощь, я попробую это. - person Amit; 07.08.2018
comment
InstrumentList почти наверняка не должен быть изменчивым. Вы имеете в виду InstrumentMap? - person Amit; 07.08.2018
comment
@Амит, да. Опечатка. - person Andy Turner; 07.08.2018
comment
Не может ли этот код вызвать проблемы, если функция переназначения вызывается дважды? Из Javadoc: The default implementation may retry these steps when multiple threads attempt updates including potentially calling the remapping function multiple times. - person Finn; 07.08.2018
comment
На самом деле, вызов его в вычислении не препятствует тому, чтобы два потока одновременно добавляли к одному и тому же instrumentList. - person Finn; 07.08.2018
comment
@FinnVoichick, вы смотрите на Javadoc ConcurrentMap, а не ConcurrentHashMap? - person Andy Turner; 07.08.2018
comment
Ах да, извините, я не понял, что ConcurrentHashMap перекрывает реализацию по умолчанию. Но гарантированно ли ConcurrentHashMap сделает это безопасно? - person Finn; 07.08.2018
comment
@FinnVoichick CHM работает compute* атомарно. - person Andy Turner; 07.08.2018
comment
@AndyTurner предложенное вами решение хорошее. Но есть еще одна часть проблемы, как я уже упоминал, то есть «еще один отдельный поток, запускаемый каждые 2 минуты», теперь этот поток использует writeLock, который, безусловно, отличается от вычислений атомарных операций. Что вы предлагаете, я должен использовать блокировку записи для обоих со справедливостью? - person Amit; 10.08.2018