Заполнить ConcurrentHashMap из одного потока, а затем прочитать из нескольких потоков без каких-либо условий гонки?

У меня есть класс, в котором у меня есть ConcurrentHashMap, который обновляется одним потоком каждые 30 секунд, а затем у меня есть несколько потоков чтения, читающих из одного и того же ConcurrentHashMap, вызывая метод getNextSocket().

Ниже приведен мой одноэлементный класс, который при инициализации вызывает метод connectToSockets() для заполнения моего ConcurrentHashMap, а затем запускает фоновый поток, который обновляет одну и ту же карту каждые 30 секунд, вызывая метод updateSockets().

А затем из нескольких потоков я вызываю метод getNextSocket(), чтобы получить следующий доступный живой сокет, который использует ту же карту для получения информации. У меня также есть неизменяемый класс SocketInfo, который содержит состояние всех сокетов, независимо от того, активны они или нет.

public class SocketHolder {
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<DatacenterEnum, List<SocketInfo>> liveSocketsByDc = new ConcurrentHashMap<>();

  // Lazy Loaded Singleton Pattern
  private static class Holder {
    private static final SocketHolder INSTANCE = new SocketHolder();
  }

  public static SocketHolder getInstance() {
    return Holder.INSTANCE;
  }

  private SocketHolder() {
   connectToSockets();
   scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateSockets();
      }
    }, 30, 30, TimeUnit.SECONDS);
  }

  private void connectToSockets() {
    Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS;
    for (Map.Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) {
      List<SocketInfo> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
      liveSocketsByDc.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketInfo> connect(DatacenterEnum dc, List<String> addresses, int socketType) {
    List<SocketInfo> socketList = new ArrayList<>();
    // ... some code here
    return socketList;
  }

  // called from multiple reader threads to get next live available socket
  public Optional<SocketInfo> getNextSocket() {
    Optional<SocketInfo> liveSocket =  getLiveSocket(liveSocketsByDc.get(DatacenterEnum.CORP));
    return liveSocket;
  }

  private Optional<SocketInfo> getLiveSocket(final List<SocketInfo> listOfEndPoints) {
    if (!CollectionUtils.isEmpty(listOfEndPoints)) {
      Collections.shuffle(listOfEndPoints);
      for (SocketInfo obj : listOfEndPoints) {
        if (obj.isLive()) {
          return Optional.of(obj);
        }
      }
    }
    return Optional.absent();
  }

  // update CHM map every 30 seconds
  private void updateSockets() {
    Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS;

    for (Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) {
      List<SocketInfo> liveSockets = liveSocketsByDc.get(entry.getKey());
      List<SocketInfo> liveUpdatedSockets = new ArrayList<>();
      for (SocketInfo liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();

        boolean sent = ....;

        boolean isLive = sent ? true : false;

        // is this right here? or will it cause any race condition?
        SocketInfo state = new SocketInfo(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(state);
      }
      // update map with new liveUpdatedSockets
      liveSocketsByDc.put(entry.getKey(), liveUpdatedSockets);
    }
  }
}

Вопрос:

Является ли мой приведенный выше код потокобезопасным, и в методах updateSockets() и getNextSocket() нет условий гонки?

В моем методе updateSockets() я извлекаю List<SocketInfo> из liveSocketsByDc ConcurrentHashMap, который уже был заполнен ранее в методе connectToSockets() во время инициализации или следующего интервала в 30 секунд в методе updateSockets(), а затем повторяю тот же список liveSockets и создаю новый объект SocketInfo в зависимости от того, верно ли isLive или ложный. Затем я обновляю liveSocketsByDc ConcurrentHashMap этим новым объектом SocketInfo. Это выглядит правильно? Потому что из нескольких потоков чтения я собираюсь вызвать метод getNextSocket(), который, в свою очередь, вызывает метод getLiveSocket, который использует ту же карту для получения следующего доступного живого сокета.

Я повторяю список liveSockets, а затем создаю новый объект SocketInfo, просто изменяя поле isLive, а все остальное останется прежним. Это правильно?

Если есть проблема с безопасностью потоков, как лучше всего это исправить?


person user1234    schedule 26.02.2017    source источник
comment
Нет, это не потокобезопасно. Первое нарушение, которое я обнаружил (дальше не стал), заключается в том, что при каждом чтении общий список ArrayList извлекается из карты и перемешивается.   -  person JB Nizet    schedule 26.02.2017
comment
Этот вопрос, вероятно, лучше подходит для нашего дочернего сайта Code Review, чем здесь.   -  person Joe C    schedule 26.02.2017
comment
@JBNizet Я вижу, как мы можем исправить это в моем примере?   -  person user1234    schedule 27.02.2017
comment
Сохраните неизменяемый список, чтобы убедиться, что вы никогда случайно не сделаете это снова. И сделайте копию списка и перетасуйте эту копию.   -  person JB Nizet    schedule 27.02.2017


Ответы (1)


Здесь:

 List<SocketInfo> liveSockets = liveSocketsByDc.get(entry.getKey());

Ваши разные потоки потенциально пишут/читают один и тот же объект списка параллельно.

Итак: не потокобезопасный. Не помогает наличие "внешней" потокобезопасной структуры данных; когда эта поточно-ориентированная вещь содержит данные... которые не являются поточно-ориентированными; зато потом "работали" параллельно!

person GhostCat    schedule 26.02.2017
comment
Каков наилучший способ исправить эту проблему безопасности потоков в моем коде? - person user1234; 27.02.2017