生产者消费者


前言

生产者与消费者模式个人理解主要用于不同线程之间的数据传输,可以将生产者看作一个线程P,消费者看作一个线程C,再加上一个数据容器D,生产者P不断向容器D里面生产数据,容器盛满时停止生产,消费者C不断从容器D里面消费数据,容器为空时停止消费;

命名约定:生产者Prod、消费者Cons、数据容器Data;

基础模式

#include <iostream>
#include <deque>
#include <condition_variable>
#include <thread>

std::deque<int> dq; // 数据容器Data
std::condition_variable cv;
std::mutex m;
unsigned long long max_length = 50; // 容器容量

/*
 * Prod:当Data中数据满了时,Prod线程停止生产;当Data中没有数据或者没满时,Prod线程一直生产;
 */
void producer() {
    int i = 0;     // 生产数据;
    while (true) { // 此处可以改为 while(i < 100) 这样就不会无休止执行下去了;
        std::unique_lock<std::mutex> lock(m);
        // 容器满就停止生产
        cv.wait(lock, [] { return dq.size() < max_length; });

        std::cout << "Producing: " << i << std::endl;
        dq.push_back(i++);
        // 通知消费者线程,所有(notify_all)或单个(notify_one);
        cv.notify_one();
        // 模拟生产时间
        std::this_thread::sleep_for(std::chrono::milliseconds(40));
        // 程序执行完后会自动解锁互斥,当然,也可以手动设置;
    }
}

/*
 * Cons:当Data中数据为空时,Cons线程停止消费;当Data中有数据时,Prod线程一直消费;
 */
void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(m);
        // 容器为空就停止消费
        cv.wait(lock, [] { return !dq.empty(); });

        int data = dq.front();
        dq.pop_front();
        std::cout << "Consuming: " << data << std::endl;
        // 通知生产者线程
        cv.notify_one();
        // 模拟消费时间
        std::this_thread::sleep_for(std::chrono::milliseconds(35));
    }
}

int main() {
    std::thread Prod(producer); // 生产者
    std::thread Cons(consumer); // 消费者
    Prod.join();
    Cons.join();
    return -1;
}

多对多

上例基础模式为一个生产者对应一个消费者;除此之外还有:多个生产者对应一个消费者、一个生产者对应多个消费者、多个生产者对应多个消费者;在这些模式中,消费者如果想知道其中一条数据属于哪个生产者可以通过给每条数据加上生产者id来实现;

#include <iostream>
#include <deque>
#include <condition_variable>
#include <thread>

std::deque<std::pair<int, int>> dq; // 数据容器Data
std::condition_variable cv;
std::mutex m;
unsigned long long max_length = 50; // 容器容量
bool done = false;                  // 用来标记Prod是否生产结束;

void producer(int id) {
    int i = 0;        // 生产数据;
    while (i < 100) { 
        std::unique_lock<std::mutex> lock(m);
        cv.wait(lock, [] { return dq.size() < max_length; });
        std::cout << id << " Producing: " << i << std::endl;
        dq.push_back(std::make_pair(i++, id));
        lock.unlock();	// 数据送入容器中后就可以解锁互斥了
        cv.notify_one();
        // 模拟生产时间
        std::this_thread::sleep_for(std::chrono::milliseconds(40));
    }
    // 创建互斥锁
    std::unique_lock<std::mutex> lock(m);
    done = true;     // 生产者不再生产了,将标记置为true
    lock.unlock();   // 解锁互斥
    cv.notify_one(); // 通知消费者线程;
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(m);
        // 容器为空就停止消费,或者生产者不生产了
        cv.wait(lock, [] { return !dq.empty() || done; });
        // 当生产者不生产,且数据容器中没有数据时,消费者线程执行结束;
        if (dq.empty() && done) {
            break;
        }
        std::pair<int, int> data = dq.front();
        dq.pop_front();
        std::cout << "Consuming: id " << data.second << " value " << data.first << std::endl;
        lock.unlock(); // 从容器中拿出数据后就可以解锁互斥了,将上面打印写在锁内是为了输出对齐;
        // 通知生产者线程
        cv.notify_one();
        // 模拟消费时间
        std::this_thread::sleep_for(std::chrono::milliseconds(35));
    }
}

int main() {
    std::thread Prod1(producer, 1); 
    std::thread Prod2(producer, 2); 
    std::thread Cons(consumer);     
    Prod1.join();
    Prod2.join();
    Cons.join();
    return -1
}

数据回传

当消费者消费数据后,如果想将处理后的结果回传给对应的生产者,可以利用共享队列来实现;

#include <iostream>
#include <deque>
#include <condition_variable>
#include <thread>

// 数据结构体
struct Data {
    int value;
    std::pair<int, int> res; // <生产者id,value对应的结果>

    Data(int v) :
        value(v) {
    }
};

std::deque<std::pair<int, int>> dq; // 数据容器Data,其实这个队列就是共享队列,因为它是全局量;
std::deque<Data> res_dq;            // 结果容器Data;
std::condition_variable cv;
std::mutex m;
unsigned long long max_length = 50; // 容器容量
bool done = false;                  // 用来标记Prod是否生产结束;

void producer(int id) {
    int i = 0;        // 生产数据;
    while (i < 100) { 
        std::unique_lock<std::mutex> lock(m);
        // 容器满就停止生产
        cv.wait(lock, [] { return dq.size() < max_length; });
        std::cout << id << " Producing: " << i << std::endl;
        dq.push_back(std::make_pair(i++, id)); 
        lock.unlock();                         
        cv.notify_one();
        // 模拟生产时间
        std::this_thread::sleep_for(std::chrono::milliseconds(40));
    }
    // 创建互斥锁
    std::unique_lock<std::mutex> lock(m);
    done = true;    
    lock.unlock();   
    cv.notify_one(); 
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(m);
        // 容器为空就停止消费,或者生产者不生产了
        cv.wait(lock, [] { return !dq.empty() || done; });
        // 当生产者不生产,且数据容器中没有数据时,消费者线程执行结束;
        if (dq.empty() && done) {
            break;
        }
        std::pair<int, int> data = dq.front();
        dq.pop_front();
        std::cout << "Consuming: id " << data.second << " value " << data.first << std::endl;
        lock.unlock(); 
        // 通知生产者线程
        cv.notify_one();
        // 模拟消费时间
        std::this_thread::sleep_for(std::chrono::milliseconds(35));
        // 消费者处理数据并存储在另一个容器中,回传结果给生产者;
        int res = data.first * 2;
        Data item(data.first);
        item.res = std::make_pair(data.second, res);
        res_dq.push_back(item);
    }
}

int main() {
    std::thread Prod1(producer, 1); // 生产者
    std::thread Prod2(producer, 2); // 生产者
    std::thread Cons(consumer);     // 消费者
    Prod1.join();
    Prod2.join();
    Cons.join();
    return -1;
}

Ping-Pong模式

“ping-pong” 模式通常用于两个线程之间交替执行某个任务。在这个模式中,一个线程执行完毕后通知另一个线程开始执行,然后等待另一个线程执行完毕再继续,如此往复,就像乒乓球一样。

#include <iostream>
#include <deque>
#include <condition_variable>
#include <thread>

std::condition_variable cv;
std::mutex m;
bool ping = true; // 用来标记表示轮到哪个线程执行;
int count = 0;    // 数据容器Data,用来记录 ping-pong 的次数;

void pinger() {
    while (true) { 
        std::unique_lock<std::mutex> lock(m);
        cv.wait(lock, [] { return ping; });
        std::cout << "Ping: " << ++count << std::endl;
        ping = false;    // ping 之后置为false,就该pong了;
        cv.notify_one(); // 通知pong线程
        // 模拟反应时间
        std::this_thread::sleep_for(std::chrono::milliseconds(40));
    }
}

void ponger() {
    while (true) {
        std::unique_lock<std::mutex> lock(m);
        cv.wait(lock, [] { return !ping; });
        std::cout << "Pong: " << count << std::endl;
        ping = true;     // pong 之后置为true,就该ping了;
        cv.notify_one(); // 通知ping线程
        // 模拟反应时间
        std::this_thread::sleep_for(std::chrono::milliseconds(35));
    }
}

int main() {
    std::thread Ping(pinger); // ping
    std::thread Pong(ponger); // pong
    Ping.join();
    Pong.join();
    return -1;
}

总结

上面四例描述了生产者与消费者模式的基本操作流程,在实际应用中可以将producer()、consumer()方法封装为类,std::deque dq 容器里面里面也可以传递对象来实现更复杂的业务流程;

此处部分内容来源于Mr.Q的指导,感谢Mr.Q


文章作者: LSJune
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 LSJune !
评论
  目录