多线程数据传输


前言

在多线程开发任务中,不同线程之间的数据可以通过回调函数进行传输,如下例:

class Base : public QThread {
public:
    void Init() {
        model->Init(); // 模型初始化
        // 注册回调
        model->registerCallback(std::bind(&Base::getModelResCallback, this));
    }
    void run() {
        imgSend();    // 数据发送
        resProcess(); // 结果处理
    };

private:
    cv::Mat getImg(); // 获取待推理图像
    void getModelResCallback(int inferRes) {
        imgInferRes.emplace_back(inferRes); // 获取回调结果
    };
    void imgSend() {
        cv::Mat im = getImg();
        model->addImg(im);
    }
    void resProcess();

private:
    std::vector<int> imgInferRes;
    Model *model;
};

class Model : public QThread {
public:
    void Init(); // 模型初始化
    void run() {
        while (true) {
            if (!imgQueue.empty()) {
                inferRes.push(forward(imgQueue.front()));
                imgQueue.pop();
            }
        }
    };
    void addImg(cv::Mat m_img) {
        imgQueue.push(m_img);
    };
    void registerCallback(std::function<void(int modelRes)> callback) {
        inferRes.front() = callback;
        inferRes.pop();
    };

private:
    std::queue<int> inferRes;
    std::queue<int> imgQueue;
};

当线程数量变多,处理逻辑变复杂之后,整个程序的耦合度较高,因为程序的处理逻辑为:Base —(data)—> Model,Model —(res)—> Base,Base—>process(res),这样一来一回一处理,在调试程序时想分模块检查时就会比较困难,而且一个操作不当就会发生容器竞争现象;

为了将程序逻辑改为:Base —(data)—> Model —(res)—> Process 这样的线性处理方式,可以采用生产者与消费者模式;生产者与消费者详细教程请参考:生产者与消费者

class Base : public QThread {
public:
    void Init() {
        model->Init();
    }
    void run() {
        while (true) {
            imgSend();
        }
    };

private:
    Model *model;
    cv::Mat getImg(); // 获取待推理图像
    void imgSend() {
        cv::Mat im = getImg();
        model->addImg(im);
    }
};

class Model : public QThread {
public:
    void Init() {
        process->Init(); // 后处理初始化
    };
    void run() {
        while (true) {
            std::unique_lock<std::mutex> lock(mtx);
            if (!imgQueue.empty()) {
                process->addInferRes(forward(imgQueue.front()));
                imgQueue.pop();
            }
        }
    };
    void addImg(cv::Mat m_img) {
        std::lock_guard<std::mutex> lock(mtx);
        imgQueue.push(m_img);
        cv.notify_all();
    };

private:
    Process *process;
    std::queue<int> imgQueue;
    std::mutex mtx;
    std::condition_variable cv;
};

class Process : public QThread {
public:
    void Init();
    void run() {
        while (true) {
            std::unique_lock<std::mutex> lock(mtx);
            if (!resQueue.empty()) {
                resProcess(resQueue.front());
                resQueue.pop();
            }
        }
    };

    void addInferRes(int inferRes) {
        std::lock_guard<std::mutex> lock(mtx);
        resQueue.push(inferRes);
    };

private:
    void resProcess(int res);
    std::queue<int> resQueue;
    std::mutex mtx;
    std::condition_variable cv;
};

文章作者: LSJune
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 LSJune !
评论
  目录