前言
在多线程开发任务中,不同线程之间的数据可以通过回调函数进行传输,如下例:
class Base : public QThread {
public:
void Init() {
model->Init(); // 模型初始化
// 注册回调
model->registerCallback(std::bind(&Base::getModelResCallback, this));
}
void run() {
imgSend(); // 数据发送
resProcess(); // 结果处理
};
private:
cv::Mat getImg(); // 获取待推理图像
void getModelResCallback(int inferRes) {
imgInferRes.emplace_back(inferRes); // 获取回调结果
};
void imgSend() {
cv::Mat im = getImg();
model->addImg(im);
}
void resProcess();
private:
std::vector<int> imgInferRes;
Model *model;
};
class Model : public QThread {
public:
void Init(); // 模型初始化
void run() {
while (true) {
if (!imgQueue.empty()) {
inferRes.push(forward(imgQueue.front()));
imgQueue.pop();
}
}
};
void addImg(cv::Mat m_img) {
imgQueue.push(m_img);
};
void registerCallback(std::function<void(int modelRes)> callback) {
inferRes.front() = callback;
inferRes.pop();
};
private:
std::queue<int> inferRes;
std::queue<int> imgQueue;
};
当线程数量变多,处理逻辑变复杂之后,整个程序的耦合度较高,因为程序的处理逻辑为:Base —(data)—> Model,Model —(res)—> Base,Base—>process(res),这样一来一回一处理,在调试程序时想分模块检查时就会比较困难,而且一个操作不当就会发生容器竞争现象;
为了将程序逻辑改为:Base —(data)—> Model —(res)—> Process 这样的线性处理方式,可以采用生产者与消费者模式;生产者与消费者详细教程请参考:生产者与消费者
class Base : public QThread {
public:
void Init() {
model->Init();
}
void run() {
while (true) {
imgSend();
}
};
private:
Model *model;
cv::Mat getImg(); // 获取待推理图像
void imgSend() {
cv::Mat im = getImg();
model->addImg(im);
}
};
class Model : public QThread {
public:
void Init() {
process->Init(); // 后处理初始化
};
void run() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
if (!imgQueue.empty()) {
process->addInferRes(forward(imgQueue.front()));
imgQueue.pop();
}
}
};
void addImg(cv::Mat m_img) {
std::lock_guard<std::mutex> lock(mtx);
imgQueue.push(m_img);
cv.notify_all();
};
private:
Process *process;
std::queue<int> imgQueue;
std::mutex mtx;
std::condition_variable cv;
};
class Process : public QThread {
public:
void Init();
void run() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
if (!resQueue.empty()) {
resProcess(resQueue.front());
resQueue.pop();
}
}
};
void addInferRes(int inferRes) {
std::lock_guard<std::mutex> lock(mtx);
resQueue.push(inferRes);
};
private:
void resProcess(int res);
std::queue<int> resQueue;
std::mutex mtx;
std::condition_variable cv;
};