Параллелизм Kotlin для ConcurrentHashMap

Я пытаюсь поддерживать параллелизм на хэш-карте, которая периодически очищается. У меня есть кеш, который хранит данные в течение определенного периода времени. Через каждые 5 минут данные из этого кеша отправляются на сервер. После сброса хочу очистить кеш. Проблема в том, что когда я сбрасываю, данные потенциально могут быть записаны на эту карту, пока я делаю это с помощью существующего ключа. Как мне сделать этот поток безопасным?

data class A(val a: AtomicLong, val b: AtomicLong) {
   fun changeA() {
      a.incrementAndGet()
   }
}

class Flusher {
   private val cache: Map<String, A> = ConcurrentHashMap()
   private val lock = Any()
   fun retrieveA(key: String){
       synchronized(lock) {
          return cache.getOrPut(key) { A(key, 1) }
       }
   }
 
   fun flush() {
      synchronized(lock) {
           // send data to network request
           cache.clear()
      }
   }
}

// Existence of multiple classes like CacheChanger
class CacheChanger{
  fun incrementData(){
      flusher.retrieveA("x").changeA()
  }
}

Я беспокоюсь, что вышеуказанный кеш не синхронизирован должным образом. Есть ли лучшие / правильные способы заблокировать этот кеш, чтобы я не потерял данные? Должен ли я создать глубокую копию кеша и очистить его?

Поскольку вышеуказанные данные могут быть изменены другим чейнджером, не может ли это привести к проблемам?


person LateNightDev    schedule 21.06.2020    source источник
comment
Существуют ли какие-либо другие функции, кроме извлечения и сброса, которые изменяют карту? Обе эти функции синхронизируются на одном и том же замке, так в чем проблема, которой вы опасаетесь?   -  person ciamej    schedule 22.06.2020
comment
Кроме того, зачем вы используете ConcurrentHashMap, если все ваши доступы синхронизированы?   -  person ciamej    schedule 22.06.2020
comment
ConcurrentHashMap сам по себе является потокобезопасным. Также метод расширения getOrPut кажется потокобезопасным (на основе документов). Если нет никаких других методов, которые модифицируют карту не потокобезопасным способом - вы можете избавиться от этой блокировки.   -  person Michał Krzywański    schedule 22.06.2020
comment
Проблема в том, что значение класса А может быть изменено. Что, если значение класса A изменится, и я его удалю. Я обновлю пример.   -  person LateNightDev    schedule 22.06.2020
comment
@michalik OP небезопасно избавляться от блокировки, потому что сброс должен быть атомарным - всю карту необходимо прочитать, а затем очистить, и никакие записи не могут чередоваться с этим процессом.   -  person ciamej    schedule 22.06.2020
comment
Я обновил класс примером.   -  person LateNightDev    schedule 22.06.2020
comment
Возможно, вам придется сделать тип cache ConcurrentMap, чтобы получить правильное поведение параллелизма.   -  person Louis Wasserman    schedule 22.06.2020


Ответы (2)


Вы можете избавиться от блокировки.

В методе сброса вместо того, чтобы читать всю карту (например, через итератор) и затем очищать ее, удаляйте каждый элемент один за другим.

Я не уверен, что вы можете использовать метод удаления итератора (сейчас я проверю это), но вы можете выполнить итерацию по набору ключей, и для каждого ключа вызывать cache.remove() - это даст вам значение хранится и удаляется из кеша атомарно.

Сложная часть заключается в том, как убедиться, что объект класса A не будет изменен непосредственно перед отправкой по сети... Вы можете сделать это следующим образом:

Когда вы получаете от x до retrieveA и изменяете объект, вам нужно убедиться, что он все еще находится в кеше. Просто вызовите функцию извлечения еще раз. Если вы получаете точно такой же объект, все в порядке. Если отличается, то это означает, что объект был удален и отправлен по сети, но вы не знаете, была ли также отправлена ​​модификация или было отправлено состояние объекта до модификации. Тем не менее, я думаю, в вашем случае вы можете просто повторить весь процесс (применить изменение и проверить, совпадают ли объекты). Но это зависит от специфики вашего приложения.

Если вы не хотите увеличиваться дважды, то при отправке данных по сети вам придется прочитать содержимое счетчика a, сохранить его в какой-то локальной переменной и уменьшить a на эту величину (обычно она будет равна нулю) . Затем в CacheChanger, когда вы получаете другой объект из второго извлечения, вы можете проверить, равно ли значение нулю (ваша модификация была принята во внимание) или ненулевому, что означает, что ваша модификация опоздала всего на долю секунды, и вам придется повторить процесс.

Вы также можете заменить incrementAndGet на compareAndSwap, но это может немного ухудшить производительность. В этом подходе вместо увеличения вы пытаетесь поменять местами значение, которое больше на единицу. И перед отправкой по сети вы пытаетесь поменять значение на -1, чтобы обозначить значение как недопустимое. Если второй обмен не удался, это означает, что кто-то изменил значение одновременно, вам нужно проверить его еще раз, чтобы отправить самое свежее значение по сети, и вы повторяете процесс в цикле (разрывая цикл только после того, как обмен на -1 успешно). В случае замены на единицу больше вы также повторяете процесс в цикле до тех пор, пока замена не завершится успешно. Если это не удается, это либо означает, что кто-то другой переключился на какое-то большее значение, либо Flusher поменялся на -1. В последнем случае вы знаете, что вам нужно вызвать retrieveA еще раз, чтобы получить новый объект.

person ciamej    schedule 21.06.2020
comment
Зачем вам нужно перебирать все записи и имитировать поведение метода ConcurrentHashMap::clear, если он уже предоставлен и гарантирует потокобезопасность наряду с другими методами в ConcurrentHashMap? Вы в основном отказываетесь от преимуществ concurenthashmap в пользу внешней синхронизации. - person Michał Krzywański; 22.06.2020
comment
@michalik Потому что clear не возвращает содержимое карты. Вы хотите сделать это атомарно, получить содержимое и удалить его одновременно. - person ciamej; 22.06.2020
comment
@michalk см. комментарий выше - person ciamej; 22.06.2020
comment
Да, я пропустил тот комментарий об отправке данных по сети. В этом случае ваше решение будет работать, но решение OP (с блокировкой) тоже будет работать (насколько необходимо сохранить атомарность этих двух операций), но ему также нужно будет удерживать блокировку при изменении значений (например, выставляя метод в Flusher для изменения). Кроме того, решение, которое вы описываете (похоже на тип алгоритма CAS), может потребовать CacheChanger выполнить проверку CAS в случае изменения данных. Но в целом это действительно зависит от специфики приложения. - person Michał Krzywański; 22.06.2020
comment
@michalk да, альтернативой для OP было бы сохранение блокировки, изменение карты на обычную (нет необходимости в ConcurrentHashMap), но применение всех изменений во время извлечения метода при сохранении блокировки. - person ciamej; 22.06.2020
comment
@michalk, вы можете опубликовать это как ответ, если хотите, потому что это полностью правильный подход, но основанный на блокировках (поэтому еще проще рассуждать). - person ciamej; 22.06.2020
comment
@michalk хорошо, не обращай внимания на мой последний комментарий, я уже сделал это сам - person ciamej; 22.06.2020

Самое простое решение (но с худшей производительностью) — полностью полагаться на блокировки.

Вы можете изменить ConcurrentHashMap на обычный HashMap.

Затем вы должны применить все свои изменения непосредственно в функции получения:

fun retrieveA(key: String, mod: (A) -> Unit): A {
    synchronized(lock) {
        val obj: A = cache.getOrPut(key) { A(key, 1) }
        mod(obj)
        cache.put(obj)
        return obj
    }
}

Надеюсь, он скомпилируется (я не эксперт по Kotlin).

Затем вы используете его как:

class CacheChanger {
    fun incrementData() {
        flusher.retrieveA("x") { it.changeA() }
    }
}

Хорошо, я признаю, что этот код на самом деле не Kotlin ;) вы должны использовать лямбду Kotlin вместо интерфейса Consumer. Прошло некоторое время с тех пор, как я немного поиграл с Kotlin. Если кто-то может исправить это, я был бы очень благодарен.

person ciamej    schedule 21.06.2020