несколько буферов с использованием потоков

Мне нужна помощь по алгоритму многопоточной программы, которую я пишу. Это в основном команда cp в unix, но с потоком чтения и потоком записи. Я использую семафоры для синхронизации потоков. У меня есть структуры для данных буфера и потока, определенные как

struct bufType {
    char buf[BUFFER_SIZE];
    int numBytes;
};

struct threadData {
    int fd;
    bufType buf;
};

и глобальный массив bufType. Код для моего основного

int main(int argc, const char * argv[])
{
    int in, out;
    pthread_t Producer, Consumer;
    threadData producerData, consumerData;

    if (argc != 3)
    {
        cout << "Error: incorrect number of params" << endl;
        exit(0);
    }
    if ((in = open(argv[1], O_RDONLY, 0666)) == -1)
    {
        cout << "Error: cannot open input file" << endl;
        exit(0);
    }
    if ((out = open(argv[2], O_WRONLY | O_CREAT, 0666)) == -1)
    {
        cout << "Cannot create output file" << endl;
        exit(0);
    }

    sem_init(&sem_empty, 0, NUM_BUFFERS);
    sem_init(&sem_full, 0, 0);

    pthread_create (&Producer, NULL, read_thread, (void *) &producerData);
    pthread_create (&Consumer, NULL, write_thread, (void *) &consumerData);

    pthread_join(Producer, NULL);
    pthread_join(Consumer, NULL);

    return 0;
}

и читать и писать темы:

void *read_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;

    while((thread_data->buf.numBytes = slow_read(thread_data->fd, thread_data->buf.buf, BUFFER_SIZE)) != 0)
    {
        sem_post(&sem_full);
        sem_wait(&sem_empty);
    }

    pthread_exit(0);
}

void *write_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;

    sem_wait(&sem_full);
    slow_write(thread_data->fd, thread_data->buf.buf, thread_data->buf.numBytes);
    sem_post(&sem_empty);

    pthread_exit(0);
}

Итак, моя проблема заключается в том, что назначать моим переменным threadData в main и моей логике семафора в потоках чтения и записи. Я ценю любую помощь, которую вы можете оказать


person John Dodson    schedule 23.02.2015    source источник


Ответы (2)


Будучи парнем Windows, который не использует файловые дескрипторы, я могу ошибаться с входами и выходами, но я думаю, что это нужно сделать в вашем основном файле, чтобы настроить структуры threadData.

producerData.fd = in;
consumerData.fd = out;

Затем объявите ОДИН ЕДИНСТВЕННЫЙ объект типа bufType для обеих структур. Измените, например, определение threadData на

struct threadData {
    int fd;
    bufType* buf;
};

и в вашем Main вы пишете

bufType buffer;
producerData.buf = &buffer;
consumerData.buf = &buffer;

Тогда оба потока будут использовать общий буфер. В противном случае вы будете писать в буфер производителя данных, но буфер ConsumerData останется пустым (и именно здесь ваш поток записи ищет данные).

Затем вам нужно изменить логику сигнализации. Прямо сейчас ваша программа не может принимать ввод, превышающий BUFFER_SIZE, потому что ваш поток записи будет писать только один раз. Вокруг него должна быть петля. И тогда вам нужен какой-то механизм, который сигнализирует потоку записи, что данные больше не будут отправляться. Например, вы можете сделать это

void *read_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;

    while((thread_data->buf->numBytes = slow_read(thread_data->fd, thread_data->buf->buf, BUFFER_SIZE)) > 0)
    {
        sem_post(&sem_full);
        sem_wait(&sem_empty);
    }
    sem_post(&sem_full); // Note that thread_data->buf->numBytes <= 0 now

    pthread_exit(0);
}

void *write_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;


    sem_wait(&sem_full);
    while (thread_data->buf->numBytes > 0)
    {
        slow_write(thread_data->fd, thread_data->buf->buf, thread_data->buf->numBytes);
        sem_post(&sem_empty);
        sem_wait(&sem_full);
    }
    pthread_exit(0);
}

Надеюсь ошибок больше нет, решение не проверял. Но концепция должна быть такой, о которой вы просили.

person antipattern    schedule 23.02.2015
comment
Думаю, это поставит меня на правильный путь. Единственная проблема, мне нужно использовать несколько буферов, и я забыл включить это в вопрос. Таким образом, массив bufType - person John Dodson; 23.02.2015
comment
Я не понимаю, как это должно работать с несколькими буферами. Оба потока должны совместно использовать буфер, иначе они не смогут взаимодействовать - person antipattern; 23.02.2015
comment
Или вы имеете в виду, что есть два буфера, один из которых записывается, а другой читается, а затем они меняются местами? - person antipattern; 23.02.2015
comment
в основном мы пытаемся подражать проблеме обедающих философов; у нас есть пул общих ресурсов. read получает первый доступный пустой буфер и создает, а write получает первый полный буфер и потребляет его. И процесс синхронизируется с семафорами - person John Dodson; 23.02.2015
comment
Может быть, это так же просто, как присвоить данные потока с индексом массива в цикле в main? Я еще не пробовал, потому что не работал с одним буфером. - person John Dodson; 23.02.2015
comment
Я думаю, это не так просто. Эта проблема значительно сложнее, чем простой паттерн производитель-потребитель. - person antipattern; 24.02.2015

Вы можете использовать общий буферный пул, либо круговой массив, либо связанные списки. Вот ссылка на zip-файл примера Windows, который похож на то, что вы спрашиваете, используя связанные списки как часть системы обмена сообщениями между потоками для буферизации данных. Помимо создания мьютексов, семафоров и потока записи, функции небольшие и простые. mtcopy.zip .

person rcgldr    schedule 23.02.2015