UDPDK API 设计文档与本周进展

1. 系统背景

本系统通过 UDP 包接收外部采集设备发送的数据。采集设备会按照指定采样率进行采集,每一次采样会得到所有开启通道的 ADC 数据。

一次采样的结果可以理解为一个 sample point:

sample point
  ch0: ADC_SAMPLE
  ch1: ADC_SAMPLE
  ch2: ADC_SAMPLE
  ...

多个连续 sample point 组成一个 blob。由于一个 blob 通常较大,无法放入单个 UDP 包,因此发送端会将一个 blob 拆分为多个 slice,每个 UDP 包发送一个 slice。

数据关系如下:

blob
  slice 0
    sample point 0: ch0, ch1, ch2, ...
    sample point 1: ch0, ch1, ch2, ...
  slice 1
    sample point N: ch0, ch1, ch2, ...
    sample point N+1: ch0, ch1, ch2, ...

2. 核心概念

2.1 Sample Point

sample point 表示同一采样时刻下,所有开启通道的采样值集合。

例如,如果开启 160 个通道,并且每个通道数据类型为16位位宽的 ADC_SAMPLE

typedef uint16_t ADC_SAMPLE;

那么一个 sample point 的大小为:

160 * sizeof(ADC_SAMPLE)

2.2 Slice

slice 表示一个 UDP 包中携带的一段连续 sample point。

例如:

slice_size = 5
channel_num = 160

则一个 slice 中包含:

5 * 160 个 ADC_SAMPLE

slice 是网络传输层面的拆包单位。

2.3 Blob

blob 表示一次较完整的数据读取单元,由多个连续 sample point 组成。

例如:

blob_size = 1024
channel_num = 160

则一个 blob 中包含:

1024 * 160 个 ADC_SAMPLE

blob 是上层软件更常用的数据处理单位。

3. UDP Payload 设计

当前 dpdk_recv.c 中接收端将 UDP payload 前部解析为 DD_PayloadHeader

typedef struct {
    uint32_t data_blob_sample_cnt;
    uint32_t blob_size;
    uint16_t slice_size;
    uint8_t channel_enable;
    uint8_t downsampling;
    uint8_t job_id;
    uint8_t tmp[3];
} DD_PayloadHeader;

字段说明:

字段 含义
data_blob_sample_cnt 当前 slice 在数据流或 blob 中的采样点位置
blob_size 一个 blob 包含多少个 sample point
slice_size 当前 UDP 包包含多少个 sample point
channel_enable 当前开启通道数量
downsampling 降采样参数
job_id 当前采集任务编号
tmp 保留字段

当前接收端会根据 header 计算:

blob_id = data_blob_sample_cnt / blob_size + 1;
slice_id = data_blob_sample_cnt % blob_size / slice_size + 1;

目前这部分程序还对应旧版Header,没有同步现在更新的Payload Header格式。

4. 当前文件职责

4.1 dd.h

dd.h 是 DD 模块对外 API 声明文件。

它定义了:

  • ADC_SAMPLE
  • producer/consumer 模式
  • 初始化、配置、读写 slice、读写 blob 等 API

当前 API 总览:

int32_t dd_init(uint32_t mode, uint32_t ring_size, uint32_t job_id);
int32_t dd_status(uint32_t job_id);
int32_t dd_config(uint32_t channel_num, uint32_t blob_size, uint32_t stream_mode, uint32_t job_id);

void *dd_get_buf(uint32_t *index, uint32_t sample_num, uint32_t job_id);
int32_t dd_enqueue_slice(uint32_t start_index, uint32_t slice_num, uint32_t job_id);
ADC_SAMPLE *dd_dequeue_1slice(uint32_t job_id);

int32_t dd_get_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id);
int32_t dd_put_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id);
int32_t dd_get_blob(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);
int32_t dd_get_blob_pop(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);
int32_t dd_get_blob_sync(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);
int32_t dd_get_blob_copy(ADC_SAMPLE *buf, uint32_t offset, uint32_t count);
uint32_t dd_count(uint32_t job_id);
uint32_t dd_close(void);

4.2 dd.c

dd.c 是 DD API 的核心实现。

它负责:

  • 创建或打开共享内存。
  • 初始化 buf_ring
  • 管理共享内存中的 sample[] 数据区。
  • 将接收到的 slice 索引写入 ring。
  • 向外部提供 slice/blob 读取能力。

共享内存路径:

/tmp/shm/dd.shm.<job_id>

共享内存定义的结构:

typedef struct {
    SHM_HEADER *pHeader;
    struct buf_ring *br;
    ADC_SAMPLE *sample;
    uint32_t sample_max_num;
    uint32_t sample_tail;
    uint32_t last_sample_tail;
    uint32_t sample_blob_last;
    uint32_t channel_num;
    uint32_t blob_size;
    uint32_t stream_mode;
    uint32_t blob_id;
    uint32_t blob_id_pop;
} DD_Context;

static DD_Context g_ctx[JOB_NUM_MAX];

在程序中,g_ctx[job_id]表示某一个采集任务 job_id 的运行上下文。例如,g_ctx[1]对应的就是job_id为1的采集任务中,共享内存、ring、采集数据区、blob状态等信息。

共享内存结构大致为:

共享内存起始地址
|
-> SHM_HEADER
    magic
    shm_len
    buf_ring br
      br_ring[]  // 存sample索引
  sample[]       // 存真正采集数据

4.2.1 pHeader

pHeader作为指针,指向共享内存头部SHM_HEADER,在shm.h里面有定义:

typedef struct {
    uint32_t magic;    // 校验共享内存是否有效
    uint32_t shm_len;    // 共享内存长度
    struct buf_ring br __attribute__((aligned(CACHE_LINE_SIZE)));   // 共享内存里的 ring 队列对象
} SHM_HEADER;

4.2.2 buf_ring与sample[]

buf_ring 是一个环形索引队列,里面每个元素记录 sample[] 中某个 sample point 的位置。 生产者把“新写好的 sample 位置”放进去,消费者从里面取出“可读取 sample 位置”。

sample[] 是一个连续的内存区域,存储真正的采集数据。 每个 sample point 的大小为 channel_num * sizeof(ADC_SAMPLE),即一个采样点所有开启通道的采集数据。

可以大概这样子去理解采样点采集数据存储的结构:

sample point 0: sample[0 ... channel_num-1]
sample point 1: sample[channel_num ... 2*channel_num-1]
sample point 2: sample[2*channel_num ... 3*channel_num-1]

buf_ring则存储各个采样点的索引。

4.2. tail标记

sample_tail:表示 sample[] 当前写入尾部位置的索引,生产者每次写入数据后都会更新这个索引。 last_sample_tail:记录上一次读取 blob 时看到的尾部位置,用于判断有没有新数据写入,如果last_sample_tail!=sample_tail,说明有新数据写入了,此时将会更新blob_id和last_sample_tail。

4.2. 其他参数

sample用于指向共享内存里面存储采集数据的区域。其中ADC_SAMPLE,表示一个通道一次采集得2字节数据。

channel_num、blob_size、stream_mode: 根据UDP payload header配置的通道数、一个blob包含的采样点数目、当前采集工作模式的相关参数。

blob_id: blob 编号计数器,高速当前读到的是第几个blob。

blob_id_pop: 表示通过pop方式消费了第几个blob,每次成功pop一个完整blob,就会:

g_ctx[job_id].blob_id_pop++;

4.3 dpdk_recv.c

dpdk_recv.c 是接收端主程序。

它负责:

  1. 初始化 UDPDK。
  2. 创建 UDPDK socket。
  3. 绑定接收端口。
  4. 循环调用 udpdk_recvfrom() 接收 UDP payload。
  5. 解析 DD_PayloadHeader
  6. 根据 job_id 初始化 DD producer。
  7. 将 slice 数据写入共享内存。
  8. 将 slice 中的采样点索引写入 ring。

当UDP包到来后:

  1. dd_get_buf() 从 sample[] 中分配一块写入区域;
  2. udpdk_recvfrom() 把 UDP payload 中的采集数据分成采样点,写到这块区域;
  3. dd_enqueue_slice() 把这个 slice 中每个采样点的位置写进 buf_ring。

4.4 dpdk_send.c

dpdk_send.c 是测试发送端程序,用于模拟采集设备发送 UDP 数据。

它负责:

  • 初始化 UDPDK。
  • 创建发送 socket。
  • 构造测试 DD_PayloadHeader
  • 填充模拟 ADC 数据。
  • 调用 udpdk_sendto() 发送 UDP 包。

5. 当前 API 分类设计

5.1 初始化与配置 API

dd_init

int32_t dd_init(uint32_t mode, uint32_t ring_size, uint32_t job_id);

功能:

  • 在 producer 模式下创建或初始化共享内存。
  • 在 consumer 模式下打开已有共享内存。
  • 根据 job_id 区分不同采集任务。

典型调用:

dd_init(MODE_PRODUCER, 0, job_id);
dd_init(MODE_CONSUMER, 0, job_id);

dd_config

int32_t dd_config(uint32_t channel_num, uint32_t blob_size, uint32_t stream_mode, uint32_t job_id);

功能:

  • 配置开启通道数。
  • 配置一个 blob 中 sample point 数量。
  • 记录 stream 模式参数。
  • 计算共享内存 sample 区容量。

dd_status

int32_t dd_status(uint32_t job_id);

功能:

  • 根据提供的job_id查询对应的buffer_ring的状态:使用,head,tail,丢弃

dd_count

uint32_t dd_count(uint32_t job_id);

功能:

  • 返回buffer ring 中当前可读 sample point 索引数量,用于对采集的采样点数据进行统计。

5.2 写入 API

dd_get_buf

void *dd_get_buf(uint32_t *index, uint32_t sample_num, uint32_t job_id);

功能:

  • 从共享内存 sample[] 中申请一段写入区域。
  • 返回写入地址。
  • 通过 index 返回起始 sample 索引。

这个 API 主要由 dpdk_recv.c 使用。接收端可以将 UDP payload 直接写入共享内存。

dd_enqueue_slice

int32_t dd_enqueue_slice(uint32_t start_index, uint32_t slice_num, uint32_t job_id);

功能:

  • 将一个 slice 中每个 sample point 的索引写入 ring。
  • 数据本身不进入 ring,ring 只保存索引。

dd_put_slice

int32_t dd_put_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id);

功能:

  • 将调用方提供的 sample point 数据复制到共享内存。
  • 将写入位置发布到 ring。

5.3 Slice 读取 API

dd_get_slice

int32_t dd_get_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id);

功能:

  • 从 ring 中读取 num_group 个 sample point,一般这里的num_group表示一个slice里面的采样点数目。
  • 每个 sample point 包含所有开启通道的数据。
  • 读取后 ring 消费者指针会推进。

适用连续数据采集下的数据读取。

适用场景:

  • 实时曲线显示。
  • 持续流式处理。
  • 上层软件希望按最新到达的数据连续读取。

5.4 Blob 读取 API

dd_get_blob

int32_t dd_get_blob(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);

功能:

  • 从最新写入位置向前回退一个 blob 大小。
  • 拷贝这一段数据到用户 buffer。

特点:

  • 不严格保证 blob 对齐。
  • 实际为“最新滑动窗口读取”,属于Slice模式数据读取的功能。

dd_get_blob_pop

int32_t dd_get_blob_pop(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);

功能:

  • 从 ring 中连续 pop blob_size 个 sample point。
  • 组成一个 blob 后拷贝给用户。

特点:

  • 读取后会消费 ring。
  • 适合队列式处理。

疑惑: 目前看来,这个 API 的实现逻辑和注释描述有些不一致,代码中是连续 pop 出一个 blob 大小的 sample point,似乎跟Blob模式的“只返回完整 blob”要求不太匹配,后续需要理清这个 API 的设计初衷和实现细节。

dd_get_blob_sync

int32_t dd_get_blob_sync(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);

功能:

  • 检查 ring 中是否存在完整 blob。
  • 如果当前 blob 未接收完整,则不读取当前 blob。
  • 读取最后一个完整 blob。

目前阅读程序逻辑,这个 API 最接近预期的标准 blob 模式:

如果当前 blob 未完整,则读取上一个完整 blob。
如果没有任何完整 blob,则返回失败。

6. 读取模式设计

6.1 Slice 模式

slice 模式强调实时性。

外部程序每次从 ring 中取出已经到达的 sample point。连续读取多个 sample point 或多个 slice 后,可以拼出一个 blob。

读取路径:

dd_get_slice()
  -> buf_ring_dequeue_mc()
  -> sample[index]
  -> memcpy 到用户 buffer

特点:

  • 读取最新到达的数据。
  • 读取后会消费 ring。
  • 对实时显示比较友好。

6.2 Blob 模式

blob 模式强调完整性。

外部程序读取一个完整 blob,而不是读取正在接收中的半个 blob。

特点:

  • 只返回完整 blob。
  • 当前 blob 未完成时,不应返回当前 blob,应当返回上一个最新的完整 blob。

6.3 反射内存模式

反射内存模式的目标是提供一个固定大小的数据窗口,让外部程序直接读取共享内存中的 blob 空间。

理想设计:

共享内存中维护一个 blob 大小的数据区
slice 到达后,根据 slice_id 写入固定 offset
如果 offset 空闲,直接写入
如果 offset 对应旧 slice,覆盖旧数据
外部程序直接读取共享内存中的完整 blob 区域

对于用户而言,反射内存模式表现上和slice模式比较类似。

7.当前疑惑点

  1. 在blob模式读取数据里面,涉及到的三个函数:dd_get_blob、dd_get_blob_pop、dd_get_blob_sync,涉及到blob模式读数据时,其中的细节看上去和我们的预期功能有些出入,这里还有一些疑惑。
    补:后面看了杰哥之前写的文档,dd_get_blob应该是相当于slice模式,dd_get_blob_pop则参数与dd_get_blob一致,功能上也差不多。但是dd_get_blob_pop是“拿走式读取”,涉及到对buf_ring消费者指针的推进,会在buf_ring里面连续 dequeue blob_size 个索引;而dd_get_blob则不会推进buf_ring消费者指针,属于“查看式读取”。但是两个都是slice模式读数据的功能,为什么需要一个消费一个不消费,这方面还有一些疑惑跟不清楚的地方。
  2. 这个文档目的是写给中能聚控看的,修改后准备给中能聚控交流。中能聚控这边软件说后续准备开始开发api,想了解我们这边对于api封装有没有什么要求,或者习惯使用哪种方式,这方面还有一些问题。

8.本周进展

上周组会后主要是写开题报告。报告初审意见没什么问题。

PL端程序还有时钟同步问题没有解决,这些问题应该是在杰哥给的程序里面就已经有了,只是测试的话应该没有问题。

跟中能聚控要了他们现在的上位机程序,按照中能聚控的文档,环境已经配好了,但是给的程序在我们这边执行的时候,存在一些问题。


本文章使用limfx的vscode插件快速发布