Синхронизация потоков с помощью семафоров

Это был вопрос интервью, любая помощь будет оценена по достоинству.

Как вы синхронизируете два потока, из которых один увеличивает значение, а другой отображает его (PS поток, который отображает значение, должен отображать значение только тогда, когда это новое значение)

Ex : int x = 5;

T1 : увеличивает его до 6

T2: должен отображать 6 (только один раз) и должен отображать его снова, когда он становится 7

Я ответил, что буду использовать семафор примерно так:

int c=0; // variable that I used to synchronize

// In T1
if( c = 0 )
{
   c++;
   x++; // value that is incremented
}

// in T2
if( c == 1 )
{
   cout<<x;
   c--;
}

Затем он спросил, что бы вы сделали, если бы произошло переключение контекста с потока T1 на T2 после установки c на 1, но до увеличения x (поскольку в этом случае он вошел бы в P2 перед увеличением x)

Я не мог ответить на эту часть. Любая помощь будет оценена по достоинству.


person arkham knight    schedule 14.02.2018    source источник


Ответы (3)


Хорошее упражнение.

Вы не указали тег c++ в вопросе, но сам вопрос содержит cout<<x, поэтому вы, вероятно, проходили собеседование на должность C++. Несмотря на это, я собираюсь ответить на Java, так как это вопрос для собеседования, и язык не должен иметь большого значения, пока я избегаю использования чего-то слишком специфичного для Java.

Как указал ваш интервьюер, синхронизация должна происходить в обоих направлениях:

  • Поток печати должен ждать, пока увеличивающийся поток завершит свою работу.
  • Увеличивающий поток должен ждать, пока поток печати завершит свою работу.

Итак, нам нужно что-то, чтобы сообщить нам, что принтер готов (чтобы инкрементатор мог работать), и еще что-то, чтобы сообщить нам, что инкрементатор готов. Для этого я использовал два семафора:

Рабочая версия на Ideone

import java.util.concurrent.Semaphore;

class IncrementDemo {
    static int x = 0;

    public static void main(String[] args) {
        Semaphore incrementLock = new Semaphore(0);
        Semaphore printLock = new Semaphore(0);

        Thread incrementer = new Thread(() -> {
            for(;;) {
                incrementLock.acquire(); //Wait to be allowed to increment
                x++;
                printLock.release(); //Allow the printer to print
            }
        });

        Thread printer = new Thread(() -> {
            for (;;) {
                incrementLock.release(); //Let the incrementer to its job
                printLock.acquire(); //Wait to be allowed to print
                System.out.println(x);
            }
        });

        incrementer.setDaemon(false); //Keep the program alive after main() exits
        printer.setDaemon(false);

        incrementer.start(); //Start both threads
        printer.start();
    }

}

(Я удалил блоки try/catch вокруг acquire для удобочитаемости).

Выход:

1
2
3
4
5
6
7
...
person Malt    schedule 14.02.2018
comment
Это неправильно, по крайней мере, с pthread под капотом (который я верить, применимо и к Java): If a thread attempts to unlock a mutex that it has not locked or a mutex which is unlocked, undefined behavior results. И циклы занятости могут быть или не быть неправильными в зависимости от того, как работают барьеры памяти и другие оптимизации (в Java или C++). В любом случае правильный ответ - условные переменные. - person freakish; 14.02.2018
comment
@freakish AFAIK и Java, и pthread Семафоры (в отличие от мьютексов) могут быть освобождены и получены разными потоками. Код цикла занятости (который, очевидно, предназначен только для примера, и никто не должен его использовать) использует логические значения volatile и x, которые создают отношение «происходит до» в Java (docs.oracle.com/javase/tutorial/essential/concurrency/) - person Malt; 14.02.2018
comment
Разве incrementLock.release() не должен быть после оператора печати в потоке 2 - person arkham knight; 20.02.2018
comment
@user9355495 user9355495 Если оба семафора начинаются с 0 и оба потока начинают с получения разрешения, они оба будут ждать неопределенное время. Вы можете переместить acquire после печати, но вам придется инициализировать семафор на 1. - person Malt; 20.02.2018
comment
Спасибо, чувак, этот вопрос снова задали в Intel. На этот раз я ответил :) - person arkham knight; 22.02.2018

Это классический вариант использования переменной условия с небольшой заминкой, заключающейся в том, что значение можно легко обновить более одного раза в потоке 1, прежде чем поток 2 запустится для его обработки:

// In some scope common to both threads
int c_ = 0; // variable
std::mutex mutex_();
std::condition_variable cond_();

// Thread 1
{ 
    std::lock_guard<std::mutex> lock(mutex_);
    ++c_;
}
cond_.notify_one();

// Thread 2
{ 
    std::lock_guard<std::mutex> lock( mutex_ );
    int cLocal = c_;
    while ( !done ) { 
        cond_.wait( lock, [] { return c_ != cLocal; } );
        while ( cLocal++ < c_ ) 
            ... // Display new *local* value
    }
}
person Andy    schedule 14.02.2018

Проблемы:

Вообще есть две основные проблемы с параллельным кодом.

<сильный>1. Атомарность

Наименьшая степень детализации кода — это на самом деле не отдельные операции, такие как i++, а лежащие в их основе ассемблерные инструкции. Поэтому каждая операция, включающая запись, не может быть вызвана из нескольких потоков. (это сильно зависит от вашей целевой архитектуры, но x86, в отличие от arm64, очень ограничен)

Но, к счастью, c++ предоставляет операции std::atomic, которые дают вам хороший независимый от платформы способ изменения переменных из нескольких потоков.

<сильный>2. Согласованность

И компилятору, и процессору разрешено переупорядочивать любые инструкции до тех пор, пока сохраняется согласованность локального потока. Итак, что это значит?

посмотри на свою первую тему

if( c = 0 )
{
   c++;
   x++; // value that is incremented
}

У вас есть 3 операции c == 0, c++ и x++. Оба приращения не зависят друг от друга, поэтому компилятору будет разрешено менять их местами. Во время выполнения ядро ​​также может изменить их порядок, оставив вас в очень неопределенной ситуации. В последовательном мире это совершенно нормально и улучшает общую производительность (если только это не приводит к дырам в безопасности, таким как расплавление). К сожалению, ни компилятор, ни процессор не распознают параллельный код, поэтому любая оптимизация может сломать вашу параллельную программу.

Но опять же, C++ предоставляет встроенное решение этой проблемы, называемое std::memory_order, которое реализует конкретную модель согласованности.

Решения:

Простой мьютекс. Мьютекс — это простой, но мощный инструмент. Он решает проблемы с атомарностью и согласованностью, предоставляя так называемые критические секции, которые предотвращают параллельное выполнение. Это означает, что в данном примере оператор if в обоих потоках является последовательным и никогда не будет выполняться параллельно. Реализация работает, но есть недостаток. Если один из потоков работает очень медленно, другой будет тратить много процессорного времени на непрерывную проверку флага newValue.

#include <mutex>

std::mutex mutex;
int value = true;
bool newValue = false;

void producer_thread() {
   while(true) {
       std::lock_guard<std::mutex> lg(mutex);
        if (newValue == false) {
            value++;
            newValue = true;
        }
   }
}

void consumer_thread() {
   while(true) {
       std::lock_guard<std::mutex> lg(mutex);
        if (newValue == true) {
            std::cout << value;
            newValue = false;
        }
   }
}

Переменная условия:

Условная переменная — это, по сути, просто конструкция «ожидание уведомления». Вы можете заблокировать текущее выполнение, вызвав wait, пока другой поток не вызовет notify. Эта реализация будет сценарием перехода.

#include <mutex>
#include <condition_variable>

std::mutex mutex;
std::condition_variable cond;
int value = true;
bool newValue = false;

void producer() {
   while(true) {
       std::unique_lock<std::mutex> ul(mutex);

        while (newValue == true) {
            cond.wait(ul);
        }

        value++;
        newValue = true;
        cond.notify_all();
   }
}

void consumer() {
   while(true) {
       std::unique_lock<std::mutex> ul(mutex);

        while (newValue == false) {
            cond.wait(ul);
        }

        std::cout << value;
        newValue = false;
        cond.notify_all();
   }
}
person Domso    schedule 14.02.2018