Jetson文件自动上传


写在前面

本例主要介绍如何基于 inotify 实现对设备文件目录的实时监控,并通过 libcurl 客户端库采用 HTTP 协议将新增文件上传至服务器。

客户端主要步骤,Sender:

  1. 递归扫描旧文件,入队;
  2. 监听根目录与新建子目录,生成新文件后入队;
  3. 文件上传,消费者生产者模式,构造 HTTP POST 请求,使用 libcurl + curl_mime_* 提交;
  4. 空目录清理,定时删除不为当天的、已经空的日期目录 。

服务端主要步骤,Reader:

  1. 启动 HTTP 服务器,监听 /upload 接口;
  2. 解析上传的 relpath 和 file 字段,进行路径合法性检查;
  3. 创建保存目录并保存文件;
  4. 返回 JSON 响应,告知客户端保存结果。

inotify

文件监控机制:inotify。

inotify 为 Linux 内核级文件系统事件监控机制,用于监控文件系统事件。

使用流程:

  1. 调用 inotify_init() 创建一个 inotify 实例(返回一个文件描述符)。
  2. 通过 inotify_add_watch() 添加需要监听的文件或目录及其事件。
  3. 程序阻塞在 read() 上等待内核事件通知。
  4. 事件触发时,内核会将事件写入文件描述符,应用程序读取后进行处理。
#include <sys/inotify.h>

/*
    struct inotify_event
    {
      int wd;			    // Watch descriptor.  
      uint32_t mask;	    // Watch mask.  
      uint32_t cookie;	    // Cookie to synchronize two events.  
      uint32_t len;		    // name 字段的长度。
      char name[];			// Name.
    };
*/

// 设置监控的目标目录路径
const fs::path WATCH_ROOT = fs::path(getenv("HOME")) / "lsj/TrafficServer_YunNan/release/alarm";
// 定义监控事件缓冲区大小。缓冲区可接收16个事件,256为inotify_event结构体中char name[] 的大小。
const size_t EVENT_BUF_LEN = 16 * (sizeof(struct inotify_event) + 256);

/* ---------- inotify 监听 ---------- */
void watch_dir(int inotify_fd, int wd) {
    // 创建缓冲区,buf存放从内核读出的原始事件数据
    char buf[EVENT_BUF_LEN];
    while (true) {
        // 监听循环,read 阻塞模式,没有事件时会一直阻塞
        int len = read(inotify_fd, buf, sizeof(buf));
        if (len <= 0) continue;		// 读取失败处理
        int i = 0;
        while (i < len) {	// 遍历读取到的事件
            // 把 char* 类型强转为 inotify_event* 类型,用于解析单个事件
            auto* event = (struct inotify_event*)&buf[i];
            // event->len 文件名长度。event->mask & IN_CLOSE_WRITE 判断事件类型是文件写入后关闭
            if (event->len && (event->mask & IN_CLOSE_WRITE)) {
                // 构造完整路径。
                fs::path file_path = WATCH_ROOT / event->name;
                if (file_path.extension() == ".mp4" || file_path.extension() == ".jpg" || file_path.extension() == ".png") {
                    enqueue_file(file_path);
                }
            }
            // 移动 i 指针到下一个事件的起始位置:inotify_event结构体size + name文件名的size。
            i += sizeof(struct inotify_event) + event->len;
        }
    }
}

int main(int argc, char* argv[]) {
    // 创建一个 inotify 实例
    int inotify_fd = inotify_init();
	// 添加需要监听的目录及其事件。IN_CLOSE_WRITE:关闭打开写的文件。
    int wd = inotify_add_watch(inotify_fd, WATCH_ROOT.c_str(), IN_CLOSE_WRITE);
    // 启动事件监听主循环。
    watch_dir(inotify_fd, wd);
}

libcurl

libcurl 是一个强大的跨平台网络传输库,支持 HTTP、HTTPS、FTP 等协议。其 API 分为三类接口,适用于不同场景:

接口类型 特点 适用场景
Easy Interface 同步阻塞式、简单易用 单任务简单请求(GET/POST 等)
Multi Interface 非阻塞异步、多任务并行 高并发、大量同时连接
Share Interface 共享数据(如 DNS 缓存、SSL 会话) 多线程间资源共享

全局初始化与清理:

  • curl_global_init(long flags):初始化libcurl,设置全局选项(如SSL支持等),必须在其他函数之前调用一次。

  • curl_global_cleanup():清理libcurl使用的全局资源,在程序结束前调用。

easy接口相关

  • curl_easy_init():创建一个easy句柄。

  • curl_easy_cleanup():清理并销毁easy句柄。

  • curl_easy_setopt():设置传输选项(如URL、回调函数、请求头等),配置请求的核心函数。

  • curl_easy_perform():执行单个传输请求(同步)。

  • curl_easy_getinfo():获取传输相关的信息(如响应码、传输时间等)。

调用流程:curl_easy_init → easy_setopt → easy_perform → easy_cleanup

创建easy句柄、设置选项、执行传输、清理句柄。

multi接口相关

  • curl_multi_init():创建一个multi句柄。

  • curl_multi_cleanup():清理multi句柄。

  • curl_multi_add_handle():将easy句柄添加到multi句柄中。

  • curl_multi_remove_handle():从multi句柄中移除一个easy句柄。

  • curl_multi_perform():执行非阻塞的文件传输,需要循环调用直到所有传输完成。

  • curl_multi_wait():等待一个或多个文件描述符的活动(用于多路复用I/O)。

  • curl_multi_poll():类似curl_multi_wait,但提供了更精确的超时控制(较新版本可用)。

  • curl_multi_info_read():读取已完成传输的信息。

调用流程:curl_multi_init → multi_add_handle(easy) → (poll/socket_action 循环) → multi_remove_handle → multi_cleanup

创建多个easy句柄和一个multi句柄,将easy句柄添加到multi句柄中,然后执行事件循环,清除句柄。

share接口相关

  • curl_share_init():创建一个share句柄。

  • curl_share_setopt():设置共享选项(如共享什么类型的数据)。

  • curl_share_cleanup():清理share句柄。

调用流程:curl_share_init → share_setopt(lockfunc…) → easy_setopt(CURLOPT_SHARE, sh)

各函数详细参数信息请参考官方API:https://curl.se/libcurl/c/allfuncs.html

HTTP 请求数据格式

下来介绍一些常见的 HTTP 请求数据格式和对应的设置方式。

application/x-www-form-urlencoded

默认表单提交格式,数据编码为键值对(如 key1=value1&key2=value2);数据会被 URL 编码(空格变 +,特殊字符转义)。不适合二进制数据(如文件上传)。

curl_easy_setopt(curl, CURLOPT_POSTFIELDS, "name=foo&age=25");

优点:文本简短、无需边界、老旧浏览器友好

缺点:仅键值对;文件要 base64 拼字符串非常低效

application/json

提交 JSON 格式数据(常见于 REST API)。需手动设置 Content-Type 头。

curl_easy_setopt(curl, CURLOPT_POSTFIELDS, "{\"name\":\"foo\",\"age\":25}");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, "Content-Type: application/json");

特点:天然支持嵌套数组/对象;易于跨语言反序列化;REST/GraphQL/微服务首选。

multipart/form-data

用于混合文本字段和文件上传,需通过 curl_mime API(旧版用 curl_formadd,已废弃)。

curl_mime *mime = curl_mime_init(curl);
curl_mimepart *part;

// 添加文本字段
part = curl_mime_addpart(mime);
curl_mime_name(part, "username");	// 设置字段名
curl_mime_data(part, "testuser", CURL_ZERO_TERMINATED);	// 设置文本数据。

// 添加文件字段
part = curl_mime_addpart(mime);
curl_mime_name(part, "file");
curl_mime_filedata(part, "test.jpg");	// 设置文件路径。

// 设置 MIME 数据并执行
curl_easy_setopt(curl, CURLOPT_MIMEPOST, mime);
curl_easy_perform(curl);

// 清理
curl_mime_free(mime);

application/octet-stream

直接上传二进制数据(如文件内容)。编码方式:纯二进制流。

FILE *file = fopen("data.bin", "rb");
curl_easy_setopt(curl, CURLOPT_READDATA, file);
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, "Content-Type: application/octet-stream");

特点:不需要任何边界或编码;最节省带宽。服务器端直接按字节写文件,无需解析。

适合场景:OSS/S3 PUT /object ;流媒体分片(DASH、HLS chunk);大文件直传(后台从 stdin 流式读取)。

常用的c++ http库

程序示例

client.cpp

#include <curl/curl.h>
#include <sys/inotify.h>
#include <unistd.h>
#include <filesystem>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <regex>
#include <iostream>

namespace fs = std::filesystem;
using namespace std::chrono;

const fs::path WATCH_ROOT = fs::path(getenv("HOME")) / "path/Filse";
const size_t EVENT_BUF_LEN = 16 * (sizeof(struct inotify_event) + 256);

static int inotify_fd;
static std::unordered_map<int, std::string> wd2path;   // wd -> path
static std::unordered_map<std::string, int> path2wd;    // path -> wd

/* ---------- 线程安全队列 ---------- */
std::queue<fs::path> q;
std::mutex mtx;
std::condition_variable cv;
void enqueue(const fs::path& p){ std::lock_guard g(mtx); q.push(p); cv.notify_one(); }

/* ---------- 扫描历史文件 ---------- */
void scan_existing() {
    for (auto& p : fs::recursive_directory_iterator(WATCH_ROOT)) {
        if (!p.is_regular_file()) continue;
        auto ext = p.path().extension();
        if (ext == ".mp4" || ext == ".jpg")
            enqueue(p.path());          // 放进队列
    }
}

/* ---------- 子目录监听 ---------- */
void add_date_watch(const fs::path& d){
    const std::string pstr = d.string();               // ② 提前转一次,复用
    if (path2wd.count(pstr)) {return;}
    int wd = inotify_add_watch(inotify_fd, d.c_str(), IN_CLOSE_WRITE | IN_MOVED_TO | IN_DELETE_SELF | IN_IGNORED);
    if (wd == -1) {
        perror("inotify_add_watch");
        return;
    }
    wd2path[wd]  = pstr; path2wd[pstr] = wd;
}

/* ---------- 初始化 ---------- */
void init_watches() {
    inotify_fd = inotify_init1(IN_NONBLOCK);
    const std::array<std::string,2> LEVEL1 = {"video", "pic"};
    for (auto& sub : LEVEL1) {
        fs::path dir = WATCH_ROOT / sub;
        int wd = inotify_add_watch(inotify_fd, dir.c_str(), IN_CREATE | IN_ISDIR | IN_DELETE_SELF | IN_IGNORED | IN_MOVED_TO | IN_CLOSE_WRITE);
        if (wd == -1) {      // 出错要检查
            perror("inotify_add_watch");
            continue;
        }
        std::string dstr = dir.string();
        wd2path[wd] = dstr; path2wd[dstr] = wd; 
        // 如果该目录里已有子目录,递增挂 watch
        for (auto& e : fs::directory_iterator(dir)){
            if (e.is_directory()) { add_date_watch(e.path());}
        }
    }
}

/* ---------- inotify 监听 ---------- */
void watch_loop() {
    std::array<char,4096> buf{};
    while (true) {
        int len = read(inotify_fd, buf.data(), buf.size());
        if (len <= 0) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); continue; }
        for (int i=0;i<len; ){
            auto* ev = (inotify_event*)(buf.data()+i);
            fs::path parent = wd2path[ev->wd];
            fs::path full   = parent / ev->name;
            /* 目录被 rm 掉:释放 watch */
            if (ev->mask & (IN_DELETE_SELF|IN_IGNORED)) {
                inotify_rm_watch(inotify_fd, ev->wd);
                const std::string pstr = parent.string();   // ←① 转成 string
                path2wd.erase(pstr);
                wd2path.erase(ev->wd);
            }
            /* 新目录创建监听 */
            else if ((ev->mask & IN_ISDIR) && (ev->mask & IN_CREATE)) {
                add_date_watch(full);
            }
            /* 文件写完 / rename 进入日期目录 */
            else if (ev->mask & (IN_CLOSE_WRITE|IN_MOVED_TO)) {
                auto ext=full.extension();
                if(ext==".mp4"||ext==".jpg"||ext==".png") enqueue(full);
            }
            i += sizeof(inotify_event)+ev->len;
        }
    }
}

/* ---------- HTTP 上传 ---------- */
bool http_post(const std::string& url, const fs::path& file, const std::string& relpath) {
    std::cout << "\n[HTTP-DEBUG] 准备上传:"
          << "\n  file    = " << file
          << "\n  relpath = " << relpath << '\n';

    CURL* curl = curl_easy_init();
    if (!curl) return false;

    curl_mime* mime = curl_mime_init(curl);

    /* ① 文件 part */
    curl_mimepart* part = curl_mime_addpart(mime);
    curl_mime_name(part, "file");
    curl_mime_filedata(part, file.c_str());

    /* ② 相对路径 part */
    part = curl_mime_addpart(mime);
    curl_mime_name(part, "relpath");
    curl_mime_data(part, relpath.c_str(), CURL_ZERO_TERMINATED);

    curl_easy_setopt(curl, CURLOPT_MIMEPOST, mime);
    curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
    curl_easy_setopt(curl, CURLOPT_TIMEOUT, 20L);

    CURLcode res = curl_easy_perform(curl);
    curl_mime_free(mime);
    curl_easy_cleanup(curl);
    return res == CURLE_OK;
}

/* ---------- 发送线程 ---------- */
void sender_thread(const std::string& url) {
    while (true) {
        std::unique_lock<std::mutex> ul(mtx);
        cv.wait(ul, [] { return !q.empty(); });
        fs::path p = q.front(); q.pop();
        ul.unlock();
        // 计算相对于WATCH_ROOT的相对路径
        std::string rel = fs::relative(p, WATCH_ROOT).parent_path().string();
        std::cout << "[Sender] prepare → " << rel << '\n';
        if (http_post(url, p, rel)) {
            try {
                // fs::remove(p); // 上传成功可删除原文件
            } catch (std::exception& e) {
                std::cerr << "remove error: " << e.what() << '\n';
            }
        }
        else {
            std::cerr << "Upload failed " << p << ", retry in 10s\n";
            std::this_thread::sleep_for(std::chrono::seconds(10));
            enqueue(p); // 重试
        }
    }
}

/* ---------- 定时清洗 ---------- */
void cleaner_thread() {
    using namespace std::chrono;
    while (true) {
        std::this_thread::sleep_for(hours(1));   // 每小时跑一次
    }
}

int main(int argc, char* argv[]) {
    const std::string url = "http://xxx.xxx.xxx.xxx:8080/upload";
    curl_global_init(CURL_GLOBAL_ALL);
    init_watches();
    scan_existing();                // ① 先把已有文件丢进队列

    std::thread t(sender_thread, url);
    watch_loop();

    t.join();
    curl_global_cleanup();
    std::thread(cleaner_thread).detach();
}

service.cpp

#include "httplib.h" // 单头文件
#include <chrono>
#include <filesystem>
#include <fstream> 
#include <iostream>

namespace fs = std::filesystem;
using namespace std::chrono;

fs::path SAVE_ROOT{R"(D:\Files)"}; // Windows 示例

int main(int argc, char *argv[]) {
    int port = (argc >= 2) ? std::stoi(argv[1]) : 8080;
    httplib::Server svr;

    svr.Post("/upload", [](const httplib::Request &req, httplib::Response &res) {
        // std::cout << "----- params ------\n";
        // for (auto &p : req.params)
        //     std::cout << p.first << " = " << p.second << '\n';

        // std::cout << "----- files -------\n";
        // for (auto &f : req.files)
        //     std::cout << f.first << " (size=" << f.second.content.size() << ")\n";
        // 获取相对路径参数
        std::string relpath;
        if (req.has_param("relpath")) {
            relpath = req.get_param_value("relpath");
        } else if (req.has_file("relpath")) { // ← 兼容当前格式
            relpath = req.get_file_value("relpath").content;
        }
        // 路径安全检查:禁止上级目录遍历
        if (relpath.find("..") != std::string::npos) {
            res.status = 400;
            res.set_content(R"({"err":"invalid path"})", "application/json");
            return;
        }
        // 取文件字段
        if (!req.has_file("file")) {
            res.status = 400;
            res.set_content(R"({"err":"no file"})", "application/json");
            return;
        }
        const auto &file = req.get_file_value("file");

        // 构建完整保存路径
        fs::path dst_dir = SAVE_ROOT / relpath;
        std::cout << dst_dir << std::endl;
        std::cout << relpath << std::endl;
        fs::create_directories(dst_dir);

        fs::path dst = dst_dir / file.filename;
        std::ofstream ofs(dst, std::ios::binary);
        ofs.write(file.content.c_str(), file.content.size());
        ofs.close();

        res.set_content("{\"saved\":\"" + dst.string() + "\"}", "application/json");
    });
    std::cout << "Receiver listening on 0.0.0.0:" << port << "  (root=" << SAVE_ROOT << ")\n";
    svr.listen("0.0.0.0", port);
}

文章作者: LSJune
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 LSJune !
评论
  目录