写在前面
本例主要介绍如何基于 inotify
实现对设备文件目录的实时监控,并通过 libcurl
客户端库采用 HTTP
协议将新增文件上传至服务器。
客户端主要步骤,Sender:
- 递归扫描旧文件,入队;
- 监听根目录与新建子目录,生成新文件后入队;
- 文件上传,消费者生产者模式,构造 HTTP POST 请求,使用
libcurl + curl_mime_*
提交; - 空目录清理,定时删除不为当天的、已经空的日期目录 。
服务端主要步骤,Reader:
- 启动 HTTP 服务器,监听
/upload
接口; - 解析上传的 relpath 和 file 字段,进行路径合法性检查;
- 创建保存目录并保存文件;
- 返回 JSON 响应,告知客户端保存结果。
inotify
文件监控机制:inotify。
inotify 为 Linux 内核级文件系统事件监控机制,用于监控文件系统事件。
使用流程:
- 调用 inotify_init() 创建一个 inotify 实例(返回一个文件描述符)。
- 通过 inotify_add_watch() 添加需要监听的文件或目录及其事件。
- 程序阻塞在 read() 上等待内核事件通知。
- 事件触发时,内核会将事件写入文件描述符,应用程序读取后进行处理。
#include <sys/inotify.h>
/*
struct inotify_event
{
int wd; // Watch descriptor.
uint32_t mask; // Watch mask.
uint32_t cookie; // Cookie to synchronize two events.
uint32_t len; // name 字段的长度。
char name[]; // Name.
};
*/
// 设置监控的目标目录路径
const fs::path WATCH_ROOT = fs::path(getenv("HOME")) / "lsj/TrafficServer_YunNan/release/alarm";
// 定义监控事件缓冲区大小。缓冲区可接收16个事件,256为inotify_event结构体中char name[] 的大小。
const size_t EVENT_BUF_LEN = 16 * (sizeof(struct inotify_event) + 256);
/* ---------- inotify 监听 ---------- */
void watch_dir(int inotify_fd, int wd) {
// 创建缓冲区,buf存放从内核读出的原始事件数据
char buf[EVENT_BUF_LEN];
while (true) {
// 监听循环,read 阻塞模式,没有事件时会一直阻塞
int len = read(inotify_fd, buf, sizeof(buf));
if (len <= 0) continue; // 读取失败处理
int i = 0;
while (i < len) { // 遍历读取到的事件
// 把 char* 类型强转为 inotify_event* 类型,用于解析单个事件
auto* event = (struct inotify_event*)&buf[i];
// event->len 文件名长度。event->mask & IN_CLOSE_WRITE 判断事件类型是文件写入后关闭
if (event->len && (event->mask & IN_CLOSE_WRITE)) {
// 构造完整路径。
fs::path file_path = WATCH_ROOT / event->name;
if (file_path.extension() == ".mp4" || file_path.extension() == ".jpg" || file_path.extension() == ".png") {
enqueue_file(file_path);
}
}
// 移动 i 指针到下一个事件的起始位置:inotify_event结构体size + name文件名的size。
i += sizeof(struct inotify_event) + event->len;
}
}
}
int main(int argc, char* argv[]) {
// 创建一个 inotify 实例
int inotify_fd = inotify_init();
// 添加需要监听的目录及其事件。IN_CLOSE_WRITE:关闭打开写的文件。
int wd = inotify_add_watch(inotify_fd, WATCH_ROOT.c_str(), IN_CLOSE_WRITE);
// 启动事件监听主循环。
watch_dir(inotify_fd, wd);
}
libcurl
libcurl 是一个强大的跨平台网络传输库,支持 HTTP、HTTPS、FTP 等协议。其 API 分为三类接口,适用于不同场景:
接口类型 | 特点 | 适用场景 |
---|---|---|
Easy Interface | 同步阻塞式、简单易用 | 单任务简单请求(GET/POST 等) |
Multi Interface | 非阻塞异步、多任务并行 | 高并发、大量同时连接 |
Share Interface | 共享数据(如 DNS 缓存、SSL 会话) | 多线程间资源共享 |
全局初始化与清理:
curl_global_init(long flags)
:初始化libcurl,设置全局选项(如SSL支持等),必须在其他函数之前调用一次。curl_global_cleanup()
:清理libcurl使用的全局资源,在程序结束前调用。
easy接口相关
curl_easy_init()
:创建一个easy句柄。curl_easy_cleanup()
:清理并销毁easy句柄。curl_easy_setopt()
:设置传输选项(如URL、回调函数、请求头等),配置请求的核心函数。curl_easy_perform()
:执行单个传输请求(同步)。curl_easy_getinfo()
:获取传输相关的信息(如响应码、传输时间等)。
调用流程:curl_easy_init → easy_setopt → easy_perform → easy_cleanup
创建easy句柄、设置选项、执行传输、清理句柄。
multi接口相关
curl_multi_init()
:创建一个multi句柄。curl_multi_cleanup()
:清理multi句柄。curl_multi_add_handle()
:将easy句柄添加到multi句柄中。curl_multi_remove_handle()
:从multi句柄中移除一个easy句柄。curl_multi_perform()
:执行非阻塞的文件传输,需要循环调用直到所有传输完成。curl_multi_wait()
:等待一个或多个文件描述符的活动(用于多路复用I/O)。curl_multi_poll()
:类似curl_multi_wait
,但提供了更精确的超时控制(较新版本可用)。curl_multi_info_read()
:读取已完成传输的信息。
调用流程:curl_multi_init → multi_add_handle(easy) → (poll/socket_action 循环) → multi_remove_handle → multi_cleanup
创建多个easy句柄和一个multi句柄,将easy句柄添加到multi句柄中,然后执行事件循环,清除句柄。
share接口相关
curl_share_init()
:创建一个share句柄。curl_share_setopt()
:设置共享选项(如共享什么类型的数据)。curl_share_cleanup()
:清理share句柄。
调用流程:curl_share_init → share_setopt(lockfunc…) → easy_setopt(CURLOPT_SHARE, sh)
各函数详细参数信息请参考官方API:https://curl.se/libcurl/c/allfuncs.html
HTTP 请求数据格式
下来介绍一些常见的 HTTP 请求数据格式和对应的设置方式。
application/x-www-form-urlencoded
默认表单提交格式,数据编码为键值对(如 key1=value1&key2=value2
);数据会被 URL 编码(空格变 +,特殊字符转义)。不适合二进制数据(如文件上传)。
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, "name=foo&age=25");
优点:文本简短、无需边界、老旧浏览器友好
缺点:仅键值对;文件要 base64 拼字符串非常低效
application/json
提交 JSON 格式数据(常见于 REST API)。需手动设置 Content-Type 头。
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, "{\"name\":\"foo\",\"age\":25}");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, "Content-Type: application/json");
特点:天然支持嵌套数组/对象;易于跨语言反序列化;REST/GraphQL/微服务首选。
multipart/form-data
用于混合文本字段和文件上传,需通过 curl_mime API(旧版用 curl_formadd,已废弃)。
curl_mime *mime = curl_mime_init(curl);
curl_mimepart *part;
// 添加文本字段
part = curl_mime_addpart(mime);
curl_mime_name(part, "username"); // 设置字段名
curl_mime_data(part, "testuser", CURL_ZERO_TERMINATED); // 设置文本数据。
// 添加文件字段
part = curl_mime_addpart(mime);
curl_mime_name(part, "file");
curl_mime_filedata(part, "test.jpg"); // 设置文件路径。
// 设置 MIME 数据并执行
curl_easy_setopt(curl, CURLOPT_MIMEPOST, mime);
curl_easy_perform(curl);
// 清理
curl_mime_free(mime);
application/octet-stream
直接上传二进制数据(如文件内容)。编码方式:纯二进制流。
FILE *file = fopen("data.bin", "rb");
curl_easy_setopt(curl, CURLOPT_READDATA, file);
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, "Content-Type: application/octet-stream");
特点:不需要任何边界或编码;最节省带宽。服务器端直接按字节写文件,无需解析。
适合场景:OSS/S3 PUT /object
;流媒体分片(DASH、HLS chunk);大文件直传(后台从 stdin 流式读取)。
常用的c++ http库
- libcurl:最常用、功能全面、支持 HTTP/HTTPS、FTP 等,支持 multipart/form-data 上传,常用作客户端。官网:https://curl.se/libcurl/
- cpp-httplib:轻量级单头文件库,支持 HTTP/HTTPS,易用,适合嵌入式和快速开发。官网:https://github.com/yhirose/cpp-httplib
- Boost.Beast:基于 Boost.Asio,支持 HTTP 1.x/2,现代 C++ 接口,适合高性能场景。官网:https://www.boost.org/doc/libs/develop/libs/beast/doc/html/index.html
- Poco::Net:集成在 POCO 框架中,支持 HTTP/HTTPS、WebSocket、REST 等。官网:https://docs.pocoproject.org/current/Poco.Net.html
- cpr:基于 libcurl 的更高级封装,接口类似 Python 的 requests,简单易用。
程序示例
client.cpp
#include <curl/curl.h>
#include <sys/inotify.h>
#include <unistd.h>
#include <filesystem>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <regex>
#include <iostream>
namespace fs = std::filesystem;
using namespace std::chrono;
const fs::path WATCH_ROOT = fs::path(getenv("HOME")) / "path/Filse";
const size_t EVENT_BUF_LEN = 16 * (sizeof(struct inotify_event) + 256);
static int inotify_fd;
static std::unordered_map<int, std::string> wd2path; // wd -> path
static std::unordered_map<std::string, int> path2wd; // path -> wd
/* ---------- 线程安全队列 ---------- */
std::queue<fs::path> q;
std::mutex mtx;
std::condition_variable cv;
void enqueue(const fs::path& p){ std::lock_guard g(mtx); q.push(p); cv.notify_one(); }
/* ---------- 扫描历史文件 ---------- */
void scan_existing() {
for (auto& p : fs::recursive_directory_iterator(WATCH_ROOT)) {
if (!p.is_regular_file()) continue;
auto ext = p.path().extension();
if (ext == ".mp4" || ext == ".jpg")
enqueue(p.path()); // 放进队列
}
}
/* ---------- 子目录监听 ---------- */
void add_date_watch(const fs::path& d){
const std::string pstr = d.string(); // ② 提前转一次,复用
if (path2wd.count(pstr)) {return;}
int wd = inotify_add_watch(inotify_fd, d.c_str(), IN_CLOSE_WRITE | IN_MOVED_TO | IN_DELETE_SELF | IN_IGNORED);
if (wd == -1) {
perror("inotify_add_watch");
return;
}
wd2path[wd] = pstr; path2wd[pstr] = wd;
}
/* ---------- 初始化 ---------- */
void init_watches() {
inotify_fd = inotify_init1(IN_NONBLOCK);
const std::array<std::string,2> LEVEL1 = {"video", "pic"};
for (auto& sub : LEVEL1) {
fs::path dir = WATCH_ROOT / sub;
int wd = inotify_add_watch(inotify_fd, dir.c_str(), IN_CREATE | IN_ISDIR | IN_DELETE_SELF | IN_IGNORED | IN_MOVED_TO | IN_CLOSE_WRITE);
if (wd == -1) { // 出错要检查
perror("inotify_add_watch");
continue;
}
std::string dstr = dir.string();
wd2path[wd] = dstr; path2wd[dstr] = wd;
// 如果该目录里已有子目录,递增挂 watch
for (auto& e : fs::directory_iterator(dir)){
if (e.is_directory()) { add_date_watch(e.path());}
}
}
}
/* ---------- inotify 监听 ---------- */
void watch_loop() {
std::array<char,4096> buf{};
while (true) {
int len = read(inotify_fd, buf.data(), buf.size());
if (len <= 0) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); continue; }
for (int i=0;i<len; ){
auto* ev = (inotify_event*)(buf.data()+i);
fs::path parent = wd2path[ev->wd];
fs::path full = parent / ev->name;
/* 目录被 rm 掉:释放 watch */
if (ev->mask & (IN_DELETE_SELF|IN_IGNORED)) {
inotify_rm_watch(inotify_fd, ev->wd);
const std::string pstr = parent.string(); // ←① 转成 string
path2wd.erase(pstr);
wd2path.erase(ev->wd);
}
/* 新目录创建监听 */
else if ((ev->mask & IN_ISDIR) && (ev->mask & IN_CREATE)) {
add_date_watch(full);
}
/* 文件写完 / rename 进入日期目录 */
else if (ev->mask & (IN_CLOSE_WRITE|IN_MOVED_TO)) {
auto ext=full.extension();
if(ext==".mp4"||ext==".jpg"||ext==".png") enqueue(full);
}
i += sizeof(inotify_event)+ev->len;
}
}
}
/* ---------- HTTP 上传 ---------- */
bool http_post(const std::string& url, const fs::path& file, const std::string& relpath) {
std::cout << "\n[HTTP-DEBUG] 准备上传:"
<< "\n file = " << file
<< "\n relpath = " << relpath << '\n';
CURL* curl = curl_easy_init();
if (!curl) return false;
curl_mime* mime = curl_mime_init(curl);
/* ① 文件 part */
curl_mimepart* part = curl_mime_addpart(mime);
curl_mime_name(part, "file");
curl_mime_filedata(part, file.c_str());
/* ② 相对路径 part */
part = curl_mime_addpart(mime);
curl_mime_name(part, "relpath");
curl_mime_data(part, relpath.c_str(), CURL_ZERO_TERMINATED);
curl_easy_setopt(curl, CURLOPT_MIMEPOST, mime);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 20L);
CURLcode res = curl_easy_perform(curl);
curl_mime_free(mime);
curl_easy_cleanup(curl);
return res == CURLE_OK;
}
/* ---------- 发送线程 ---------- */
void sender_thread(const std::string& url) {
while (true) {
std::unique_lock<std::mutex> ul(mtx);
cv.wait(ul, [] { return !q.empty(); });
fs::path p = q.front(); q.pop();
ul.unlock();
// 计算相对于WATCH_ROOT的相对路径
std::string rel = fs::relative(p, WATCH_ROOT).parent_path().string();
std::cout << "[Sender] prepare → " << rel << '\n';
if (http_post(url, p, rel)) {
try {
// fs::remove(p); // 上传成功可删除原文件
} catch (std::exception& e) {
std::cerr << "remove error: " << e.what() << '\n';
}
}
else {
std::cerr << "Upload failed " << p << ", retry in 10s\n";
std::this_thread::sleep_for(std::chrono::seconds(10));
enqueue(p); // 重试
}
}
}
/* ---------- 定时清洗 ---------- */
void cleaner_thread() {
using namespace std::chrono;
while (true) {
std::this_thread::sleep_for(hours(1)); // 每小时跑一次
}
}
int main(int argc, char* argv[]) {
const std::string url = "http://xxx.xxx.xxx.xxx:8080/upload";
curl_global_init(CURL_GLOBAL_ALL);
init_watches();
scan_existing(); // ① 先把已有文件丢进队列
std::thread t(sender_thread, url);
watch_loop();
t.join();
curl_global_cleanup();
std::thread(cleaner_thread).detach();
}
service.cpp
#include "httplib.h" // 单头文件
#include <chrono>
#include <filesystem>
#include <fstream>
#include <iostream>
namespace fs = std::filesystem;
using namespace std::chrono;
fs::path SAVE_ROOT{R"(D:\Files)"}; // Windows 示例
int main(int argc, char *argv[]) {
int port = (argc >= 2) ? std::stoi(argv[1]) : 8080;
httplib::Server svr;
svr.Post("/upload", [](const httplib::Request &req, httplib::Response &res) {
// std::cout << "----- params ------\n";
// for (auto &p : req.params)
// std::cout << p.first << " = " << p.second << '\n';
// std::cout << "----- files -------\n";
// for (auto &f : req.files)
// std::cout << f.first << " (size=" << f.second.content.size() << ")\n";
// 获取相对路径参数
std::string relpath;
if (req.has_param("relpath")) {
relpath = req.get_param_value("relpath");
} else if (req.has_file("relpath")) { // ← 兼容当前格式
relpath = req.get_file_value("relpath").content;
}
// 路径安全检查:禁止上级目录遍历
if (relpath.find("..") != std::string::npos) {
res.status = 400;
res.set_content(R"({"err":"invalid path"})", "application/json");
return;
}
// 取文件字段
if (!req.has_file("file")) {
res.status = 400;
res.set_content(R"({"err":"no file"})", "application/json");
return;
}
const auto &file = req.get_file_value("file");
// 构建完整保存路径
fs::path dst_dir = SAVE_ROOT / relpath;
std::cout << dst_dir << std::endl;
std::cout << relpath << std::endl;
fs::create_directories(dst_dir);
fs::path dst = dst_dir / file.filename;
std::ofstream ofs(dst, std::ios::binary);
ofs.write(file.content.c_str(), file.content.size());
ofs.close();
res.set_content("{\"saved\":\"" + dst.string() + "\"}", "application/json");
});
std::cout << "Receiver listening on 0.0.0.0:" << port << " (root=" << SAVE_ROOT << ")\n";
svr.listen("0.0.0.0", port);
}