Является ли этот код потокобезопасным?

Я хочу обработать поток клиентских запросов. Каждый запрос имеет свой особый тип. Сначала мне нужно инициализировать некоторые данные для этого типа, и после этого я могу начать обрабатывать запросы. Когда тип клиента приходит впервые, я просто инициализирую соответствующие данные. После этого все последующие запросы этого типа обрабатываются с использованием этих данных.

Мне нужно сделать это потокобезопасным способом.

Вот код, который я написал. Является ли он потокобезопасным?

public class Test {

    private static Map<Integer, Object> clientTypesInitiated = new ConcurrentHashMap<Integer, Object>();

    /* to process client request we need to 
    create corresponding client type data.
    on the first signal we create that data, 
    on the second - we process the request*/

    void onClientRequestReceived(int clientTypeIndex) {
        if (clientTypesInitiated.put(clientTypeIndex, "") == null) {
            //new client type index arrived, this type was never processed
            //process data for that client type and put it into the map of types
            Object clientTypeData = createClientTypeData(clientTypeIndex);
            clientTypesInitiated.put(clientTypeIndex, clientTypeData);
        } else {
            //already existing index - we already have results and we can use them
            processClientUsingClientTypeData(clientTypesInitiated.get(clientTypeIndex));
        }
    }

    Object createClientTypeData(int clientIndex) {return new Object();}

    void processClientUsingClientTypeData(Object clientTypeData) {}
}

С одной стороны, ConcurrentHashMap не может создать map.put(A,B) == null два раза для одного и того же A. С другой стороны, операция присваивания и сравнения не является потокобезопасной.

Так этот код в порядке? Если нет, то как я могу это исправить?

ОБНОВЛЕНИЕ: я принял ответ Мартина Серрано, потому что его код является потокобезопасным и не подвержен проблемам с двойной инициализацией. Но я хотел бы отметить, что я не обнаружил проблем с моей версией, опубликованной в качестве ответа ниже, и моя версия не требует синхронизации.


person KutaBeach    schedule 18.09.2012    source источник


Ответы (8)


Этот код не является потокобезопасным, потому что

//already existing index - we already have results and we can use them
processClientUsingClientTypeData(clientTypesInitiated.get(clientTypeIndex));

имеет шанс получить значение "", которое вы временно вставляете в чек пут.

Этот код можно сделать потокобезопасным таким образом:

public class Test {

    private static Map<Integer, Object> clientTypesInitiated = new ConcurrentHashMap<Integer, Object>();

    /* to process client request we need to 
       create corresponding client type data.
       on the first signal we create that data, 
       on the second - we process the request*/

void onClientRequestReceived(int clientTypeIndex) {
    Object clientTypeData = clientTypesInitiated.get(clientTypeIndex);
    if (clientTypeData == null) {
        synchronized (clientTypesInitiated) {
          clientTypeData = clientTypesInitiated.get(clientTypeIndex);
          if (clientTypeData == null) {
              //new client type index arrived, this type was never processed
              //process data for that client type and put it into the map of types
              clientTypeData = createClientTypeData(clientTypeIndex);
              clientTypesInitiated.put(clientTypeIndex, clientTypeData);
          }
        }
    }
    processClientUsingClientTypeData(clientTypeData);
}

Object createClientTypeData(int clientIndex) {return new Object();}

void processClientUsingClientTypeData(Object clientTypeData) {}

}

person Martin Serrano    schedule 18.09.2012
comment
Я думаю, вам нужно else перед processClientUsingClientTypeData. - person Gray; 18.09.2012
comment
Это также не потокобезопасно! Посмотрите: один поток выполняет get, сравнивает с нулем, переходит в if, затем другой поток делает то же самое, затем один присваивает значение clientTypeData и, в конце концов, возвращает значение из метода, но другой поток переопределяет clientTypeData. - person Krzysztof Jabłoński; 18.09.2012
comment
@Krzysztof, что небезопасно? как я уже сказал, если вы не возражаете против того, чтобы иногда выполнять createClientTypeData более одного раза, это нормально. в случае, который вы описываете, второй поток просто заменит результат первого. предположительно, поскольку любой запрос может инициировать инициализацию, он каждый раз будет давать одни и те же результаты. если инициализация действительно дорогая, то можно добавить больше сложности, чтобы избежать этого. - person Martin Serrano; 18.09.2012
comment
@MartinSerrano Посмотрите: null на карте, два потока выполняют onClientRequestReceived, оба вызывают get, извлекая этот null, и попадают в if. Там, оказавшись в if, оба создают экземпляр. Плохо. Один создает и ставит сначала, оставляет if и работает с данными. Затем второй создает еще один экземпляр, помещает, предполагая, что все еще есть нуль, что переопределяет предыдущее значение. А затем обрабатывает этот новый экземпляр. - person Krzysztof Jabłoński; 19.09.2012
comment
@Gray Нет необходимости в else, если предполагается, что обработка данных также необходима, даже если она была только что создана. - person Krzysztof Jabłoński; 19.09.2012
comment
@Krzysztof, я согласен. именно это и произошло бы. но это не значит, что это небезопасно. если эта структура данных действует как кеш (мое предположение), то это не небезопасно, а просто потенциально расточительно. - person Martin Serrano; 19.09.2012
comment
Если между вызовами действительно нет помех, то для чего нужна вся эта «потокобезопасность»? В этом случае я бы использовал ThreadLocal. Однако, цитируя вопрос: ‹‹Когда тип клиента приходит в первый раз, я просто инициализирую соответствующие данные. После этого все последующие запросы этого типа обрабатываются с использованием этих данных.›› Я предполагаю, что это означает единственный созданный экземпляр. - person Krzysztof Jabłoński; 19.09.2012
comment
справедливо. я обновил свой пример, так что инициализация происходит только один раз. у него все еще есть преимущество, заключающееся в том, что ему не нужно синхронизироваться после того, как произошла инициализация. - person Martin Serrano; 19.09.2012
comment
Теперь гораздо лучше. Выглядит более сложным в коде, но действительно эффективен после инициализации. +1 - person Krzysztof Jabłoński; 19.09.2012
comment
Ах да, я только что упомянул блокировку с двойной проверкой, и она здесь! :) Вероятно, это лучший вариант, Мартин, но, пожалуйста, прокомментируйте мой вариант ниже. Krzysztof, вы поняли, я не могу запускать код инициализации несколько раз. - person KutaBeach; 19.09.2012
comment
@KutaBeach, извините, я пока не могу комментировать! но у вашего решения есть недостаток: если инициализация началась, но не завершена, и поступает новый запрос, то isInitStarted не будет равен 0, а isInitFinished будет равен 0, так что этот запрос провалится и ничего не сделает. - person Martin Serrano; 19.09.2012
comment
Мартин, вы правы, на самом деле мой вопрос не определяет поведение потоков, которые пришли во время инициализации данных, поэтому мы можем добавить третий if и ждать там, или просто провалиться, что угодно. - person KutaBeach; 19.09.2012

Нет, я не думаю, что это все еще потокобезопасно.

Вам нужно обернуть операцию ввода в синхронизированный блок.

Согласно javadoc для ConcurrentHashMap.

Операции извлечения (включая получение) обычно не блокируются, поэтому могут пересекаться с операциями обновления (включая размещение и удаление).

person kosa    schedule 18.09.2012
comment
это не так для ConcurrentMap, он все еще может иметь потокобезопасность и без какой-либо синхронизации, что делает ConcurrentMap устаревшим. - person jdevelop; 18.09.2012
comment
@jdevelop: Не могли бы вы уточнить? - person kosa; 18.09.2012
comment
Он должен использовать метод putIfAbsent ConcurrentMap - person jdevelop; 18.09.2012
comment
@Nambari Я считаю, что то, что в вашей цитате может перекрываться, означает, что они предназначены для безопасного перекрытия, а не могут, но не должны перекрываться. Если бы это было так, было бы излишне говорить об этом, поскольку это может произойти с любым объектом данных, а не только с параллельным объектом данных. - person kjw0188; 19.09.2012
comment
Согласно документам, get вернет результаты завершенных обновлений, поэтому, если одна операция put все еще продолжается, она ничего не вернет, и в нашем случае она запустит вторую процедуру обновления. - person KutaBeach; 19.09.2012

вы должны использовать putIfAbsent здесь, семантика этой операции похожа на CAS, и она точно атомарна. А поскольку он атомарный, у вас не будет проблем с внутренними присваиваниями и сравнениями.

Текущий код не является потокобезопасным.

person jdevelop    schedule 18.09.2012
comment
Не будет ли putIfAbsent требовать создания данных, прежде чем проверять, есть ли что-нибудь? В основном это было бы так, поэтому это связано с множеством ненужных инициализаций. - person Krzysztof Jabłoński; 18.09.2012
comment
Хорошая идея, но действительно, как сказал Кшиштоф, в этом случае мне придется запускать процедуру инициализации несколько раз. - person KutaBeach; 19.09.2012
comment
вместо того, чтобы ставить реальный объект - ставьте Future, и вы в безопасности. - person jdevelop; 19.09.2012

ConcurrentHashMap создан, чтобы не использовать синхронизированный блок, хотя использование его в качестве блокировки для синхронизированного блока допустимо, но неэффективно. Если вам нужно использовать ConcurrentHashMap, правильное использование выглядит следующим образом;

private static Map<Integer, Object> clientTypesInitiated = new ConcurrentHashMap<Integer, Object>();

void onClientRequestReceived(int clientTypeIndex) {
    Object clientTypeData = clientTypesInitiated.get(clientTypeIndex);
    if (clientTypeData == null) {
        Object newClientTypeData = createClientTypeData(clientTypeIndex);
        clientTypeData = clientTypesInitiated.putIfAbsent(clientTypeIndex, newClientTypeData);
        // if clientTypeData is null, then put successful, otherwise, use the old one
        if (clientTypeData == null) {
            clientTypeData = newClientTypeData;
        }
    }
    processClientUsingClientTypeData(clientTypeData);
}

Честно говоря, приведенный выше код может создать clientTypeData несколько раз (из-за состояния гонки), хотя он не использует блок synchronized. Следовательно, вы должны измерить, насколько дорого создать clientTypeData, и если это настолько дорого, что затмит не используя блок synchronized, то вам даже не нужно использовать ConcurrentHashMap. Используйте обычные блоки HashMap и synchronized.

person kangster2    schedule 29.10.2012

Нет, если вам нужно, чтобы весь метод был потокобезопасным, объявите его синхронизированным или используйте синхронизированный блок для размещения вашего критического кода.

person Community    schedule 18.09.2012
comment
Почему? Вам нужно предоставить информацию, а не просто ответить на простой вопрос. - person Gray; 19.09.2012
comment
чтобы обеспечить безопасность потоков, для получения основной информации Google — ваш лучший друг docs.oracle.com/javase/tutorial/essential/concurrency/ - person F. Mayoral; 19.09.2012
comment
Вы упускаете мою мысль. Я знаю ответ. Ответы SO предназначены для потомков и должны быть назначениями Google, чтобы не указывать людям на Google. Ваши ответы не будут проголосованы, если они не содержат больше информации. - person Gray; 19.09.2012
comment
если знаешь, то зачем спрашиваешь? я здесь не ради очков, вопрос был в том, является ли этот код потокобезопасным?, я ответил и объяснил, что сделает его потокобезопасным, этот парень, очевидно, знает концепцию, иначе он бы никогда не задал этот вопрос, я не буду тратить свою энергию объяснение того, как работает синхронизация потоков, когда есть буквально сотни документов, объясняющих это. Если вам не нравится мой ответ, вы всегда можете опубликовать свой собственный или улучшить этот. - person F. Mayoral; 19.09.2012
comment
@ fer13488 Параллельная хэш-карта Java не обязательно должна иметь синхронизированный блок, чтобы быть потокобезопасной. Однако даже несмотря на то, что базовая структура данных является потокобезопасной, алгоритм поверх нее может иметь порядок выполнения, что может привести к плохому состоянию. Использование синхронизированного блока противоречит цели использования потокобезопасной структуры данных. - person kjw0188; 19.09.2012

Текущий код не является потокобезопасным, поскольку между if и next put может произойти переключение контекста. вам нужно сделать clientTypesInitiated как тип ConcurrentHashMap, а не карту интерфейса. Затем используйте putIfAbsent

person Arnon Rotem-Gal-Oz    schedule 18.09.2012

Я думаю, что код вообще не работает так, как задумано. Смотреть:

void onClientRequestReceived(int clientTypeIndex) {
     // the line below ensures the existing data is lost even if was not null
    if (clientTypesInitiated.put(clientTypeIndex, "") == null) {
        Object clientTypeData = createClientTypeData(clientTypeIndex);
        clientTypesInitiated.put(clientTypeIndex, clientTypeData);
    } else {
        processClientUsingClientTypeData(clientTypesInitiated.get(clientTypeIndex));
    }
}

Я предполагаю, что вам скорее нужен вызов метода get.

Я бы предложил такой подход:

void onClientRequestReceived(int clientTypeIndex) {
    Object clientTypeData;
    synchronized (clientTypesInitiated) {
        if ((clientTypeData = clientTypesInitiated.get(clientTypeIndex)) == null) {
            clientTypeData = createClientTypeData(clientTypeIndex);
            clientTypesInitiated.put(clientTypeIndex, clientTypeData);
        } 
    }
    processClientUsingClientTypeData(clientTypeData);
}
person Krzysztof Jabłoński    schedule 18.09.2012

putIfAbsent прекрасно работает, но иногда требуется повторная инициализация несколько раз. Вероятно, эту проблему может решить синхронизированный блок с блокировкой с двойной проверкой.

Но вот еще вариант на основе AtomicIntegerArrays:

public class Test {

    private static Map<Integer, Object> clientTypesInitiated = new ConcurrentHashMap<Integer, Object>();

    private static final int CLIENT_TYPES_NUMBER = 10;
    private static AtomicIntegerArray clientTypesInitStarted = new AtomicIntegerArray(CLIENT_TYPES_NUMBER);
    private static AtomicIntegerArray clientTypesInitFinished = new AtomicIntegerArray(CLIENT_TYPES_NUMBER);

    void onClientRequestReceived(int clientTypeIndex) {

        int isInitStarted = clientTypesInitStarted.getAndIncrement(clientTypeIndex);
        int isInitFinished = clientTypesInitFinished.get(clientTypeIndex);

        if (isInitStarted == 0) {
            //new client type index arrived, this type was never processed
            //process data for that client type and put it into the map of types
            Object clientTypeData = createClientTypeData(clientTypeIndex);
            clientTypesInitiated.put(clientTypeIndex, clientTypeData);
            clientTypesInitFinished.getAndIncrement(clientTypeIndex);
        } else if (isInitFinished > 0) {
            //already existing index - we already have results and we can use them
            processClientUsingClientTypeData(clientTypesInitiated.get(clientTypeIndex));
        }
    }

    Object createClientTypeData(int clientIndex) {
        return new Object();
    }

    void processClientUsingClientTypeData(Object clientTypeData) {
    }
}

Есть мнения?

person KutaBeach    schedule 18.09.2012
comment
Я думаю, вам следует рассмотреть возможность разработки более объектно-ориентированного кода. Использование ссылки Object не очень красивое решение. Создайте абстрактный класс ClientData и производный от него, реализующий конкретные классы. Создайте фабричный класс для тех, кто создает их экземпляры в зависимости от request.class. Код, который имеет перечисления типов и индексацию в целом, сложен в сопровождении и расширении. - person Krzysztof Jabłoński; 19.09.2012
comment
Извините за беспорядок, Krzysztof, Object - это просто муляж, который нужно показать как ClientData, конечно, так быть не должно. Что касается классов - на самом деле каждый тип клиента - это просто индекс ресурса, типы клиентов вообще не отличаются. Мне нужно инициировать некоторый ресурс перед обработкой клиентов, которые относятся к этому ресурсу. Да, вместо clientType лучше было бы написать resourceIndex. - person KutaBeach; 19.09.2012
comment
И теперь, ссылаясь на фактический код, я боюсь, что он глючит. Посмотрите: первый поток выполняется в if. Второй прогоняет метод, пропускает первый if. Затем он терпит неудачу при втором сравнении, а также пропускает else if, таким образом фактически ничего не делая. - person Krzysztof Jabłoński; 19.09.2012
comment
Да, именно, Кшиштоф. Нужно что-то делать в случае, когда второй поток пришел, а первый еще инициализирует данные. Мы можем поставить третье if для этого случая и ждать в этом if. Но я решил просто пропустить этот запрос. - person KutaBeach; 19.09.2012
comment
Ожидание потока возможно, когда есть объект монитора для ожидания. Такой объект автоматически генерируется JVM при входе в синхронизированный блок. Synchronized получает какой-то параметр в скобках, который является объектом, для которого создается монитор. Вы не можете эффективно синхронизировать потоки без мониторов в Java. - person Krzysztof Jabłoński; 19.09.2012