本系统通过 UDP 包接收外部采集设备发送的数据。采集设备会按照指定采样率进行采集,每一次采样会得到所有开启通道的 ADC 数据。
一次采样的结果可以理解为一个 sample point:
sample point
ch0: ADC_SAMPLE
ch1: ADC_SAMPLE
ch2: ADC_SAMPLE
...
多个连续 sample point 组成一个 blob。由于一个 blob 通常较大,无法放入单个 UDP 包,因此发送端会将一个 blob 拆分为多个 slice,每个 UDP 包发送一个 slice。
数据关系如下:
blob
slice 0
sample point 0: ch0, ch1, ch2, ...
sample point 1: ch0, ch1, ch2, ...
slice 1
sample point N: ch0, ch1, ch2, ...
sample point N+1: ch0, ch1, ch2, ...
sample point 表示同一采样时刻下,所有开启通道的采样值集合。
例如,如果开启 160 个通道,并且每个通道数据类型为16位位宽的 ADC_SAMPLE:
typedef uint16_t ADC_SAMPLE;
那么一个 sample point 的大小为:
160 * sizeof(ADC_SAMPLE)
slice 表示一个 UDP 包中携带的一段连续 sample point。
例如:
slice_size = 5
channel_num = 160
则一个 slice 中包含:
5 * 160 个 ADC_SAMPLE
slice 是网络传输层面的拆包单位。
blob 表示一次较完整的数据读取单元,由多个连续 sample point 组成。
例如:
blob_size = 1024
channel_num = 160
则一个 blob 中包含:
1024 * 160 个 ADC_SAMPLE
blob 是上层软件更常用的数据处理单位。
当前 dpdk_recv.c 中接收端将 UDP payload 前部解析为 DD_PayloadHeader:
typedef struct {
uint32_t data_blob_sample_cnt;
uint32_t blob_size;
uint16_t slice_size;
uint8_t channel_enable;
uint8_t downsampling;
uint8_t job_id;
uint8_t tmp[3];
} DD_PayloadHeader;
字段说明:
| 字段 | 含义 |
|---|---|
data_blob_sample_cnt |
当前 slice 在数据流或 blob 中的采样点位置 |
blob_size |
一个 blob 包含多少个 sample point |
slice_size |
当前 UDP 包包含多少个 sample point |
channel_enable |
当前开启通道数量 |
downsampling |
降采样参数 |
job_id |
当前采集任务编号 |
tmp |
保留字段 |
当前接收端会根据 header 计算:
blob_id = data_blob_sample_cnt / blob_size + 1;
slice_id = data_blob_sample_cnt % blob_size / slice_size + 1;
目前这部分程序还对应旧版Header,没有同步现在更新的Payload Header格式。
dd.hdd.h 是 DD 模块对外 API 声明文件。
它定义了:
ADC_SAMPLE当前 API 总览:
int32_t dd_init(uint32_t mode, uint32_t ring_size, uint32_t job_id);
int32_t dd_status(uint32_t job_id);
int32_t dd_config(uint32_t channel_num, uint32_t blob_size, uint32_t stream_mode, uint32_t job_id);
void *dd_get_buf(uint32_t *index, uint32_t sample_num, uint32_t job_id);
int32_t dd_enqueue_slice(uint32_t start_index, uint32_t slice_num, uint32_t job_id);
ADC_SAMPLE *dd_dequeue_1slice(uint32_t job_id);
int32_t dd_get_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id);
int32_t dd_put_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id);
int32_t dd_get_blob(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);
int32_t dd_get_blob_pop(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);
int32_t dd_get_blob_sync(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);
int32_t dd_get_blob_copy(ADC_SAMPLE *buf, uint32_t offset, uint32_t count);
uint32_t dd_count(uint32_t job_id);
uint32_t dd_close(void);
dd.cdd.c 是 DD API 的核心实现。
它负责:
buf_ring。sample[] 数据区。共享内存路径:
/tmp/shm/dd.shm.<job_id>
共享内存定义的结构:
typedef struct {
SHM_HEADER *pHeader;
struct buf_ring *br;
ADC_SAMPLE *sample;
uint32_t sample_max_num;
uint32_t sample_tail;
uint32_t last_sample_tail;
uint32_t sample_blob_last;
uint32_t channel_num;
uint32_t blob_size;
uint32_t stream_mode;
uint32_t blob_id;
uint32_t blob_id_pop;
} DD_Context;
static DD_Context g_ctx[JOB_NUM_MAX];
在程序中,g_ctx[job_id]表示某一个采集任务 job_id 的运行上下文。例如,g_ctx[1]对应的就是job_id为1的采集任务中,共享内存、ring、采集数据区、blob状态等信息。
共享内存结构大致为:
共享内存起始地址
|
-> SHM_HEADER
magic
shm_len
buf_ring br
br_ring[] // 存sample索引
sample[] // 存真正采集数据
pHeader作为指针,指向共享内存头部SHM_HEADER,在shm.h里面有定义:
typedef struct {
uint32_t magic; // 校验共享内存是否有效
uint32_t shm_len; // 共享内存长度
struct buf_ring br __attribute__((aligned(CACHE_LINE_SIZE))); // 共享内存里的 ring 队列对象
} SHM_HEADER;
buf_ring 是一个环形索引队列,里面每个元素记录 sample[] 中某个 sample point 的位置。 生产者把“新写好的 sample 位置”放进去,消费者从里面取出“可读取 sample 位置”。
sample[] 是一个连续的内存区域,存储真正的采集数据。 每个 sample point 的大小为 channel_num * sizeof(ADC_SAMPLE),即一个采样点所有开启通道的采集数据。
可以大概这样子去理解采样点采集数据存储的结构:
sample point 0: sample[0 ... channel_num-1]
sample point 1: sample[channel_num ... 2*channel_num-1]
sample point 2: sample[2*channel_num ... 3*channel_num-1]
buf_ring则存储各个采样点的索引。
sample_tail:表示 sample[] 当前写入尾部位置的索引,生产者每次写入数据后都会更新这个索引。 last_sample_tail:记录上一次读取 blob 时看到的尾部位置,用于判断有没有新数据写入,如果last_sample_tail!=sample_tail,说明有新数据写入了,此时将会更新blob_id和last_sample_tail。
sample用于指向共享内存里面存储采集数据的区域。其中ADC_SAMPLE,表示一个通道一次采集得2字节数据。
channel_num、blob_size、stream_mode: 根据UDP payload header配置的通道数、一个blob包含的采样点数目、当前采集工作模式的相关参数。
blob_id: blob 编号计数器,高速当前读到的是第几个blob。
blob_id_pop: 表示通过pop方式消费了第几个blob,每次成功pop一个完整blob,就会:
g_ctx[job_id].blob_id_pop++;
dpdk_recv.cdpdk_recv.c 是接收端主程序。
它负责:
udpdk_recvfrom() 接收 UDP payload。DD_PayloadHeader。job_id 初始化 DD producer。当UDP包到来后:
dpdk_send.cdpdk_send.c 是测试发送端程序,用于模拟采集设备发送 UDP 数据。
它负责:
DD_PayloadHeader。udpdk_sendto() 发送 UDP 包。dd_initint32_t dd_init(uint32_t mode, uint32_t ring_size, uint32_t job_id);
功能:
job_id 区分不同采集任务。典型调用:
dd_init(MODE_PRODUCER, 0, job_id);
dd_init(MODE_CONSUMER, 0, job_id);
dd_configint32_t dd_config(uint32_t channel_num, uint32_t blob_size, uint32_t stream_mode, uint32_t job_id);
功能:
dd_statusint32_t dd_status(uint32_t job_id);
功能:
dd_countuint32_t dd_count(uint32_t job_id);
功能:
dd_get_bufvoid *dd_get_buf(uint32_t *index, uint32_t sample_num, uint32_t job_id);
功能:
sample[] 中申请一段写入区域。index 返回起始 sample 索引。这个 API 主要由 dpdk_recv.c 使用。接收端可以将 UDP payload 直接写入共享内存。
dd_enqueue_sliceint32_t dd_enqueue_slice(uint32_t start_index, uint32_t slice_num, uint32_t job_id);
功能:
dd_put_sliceint32_t dd_put_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id);
功能:
dd_get_sliceint32_t dd_get_slice(ADC_SAMPLE *pSample, uint32_t num_group, uint32_t job_id);
功能:
num_group 个 sample point,一般这里的num_group表示一个slice里面的采样点数目。适用连续数据采集下的数据读取。
适用场景:
dd_get_blobint32_t dd_get_blob(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);
功能:
特点:
dd_get_blob_popint32_t dd_get_blob_pop(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);
功能:
blob_size 个 sample point。特点:
疑惑: 目前看来,这个 API 的实现逻辑和注释描述有些不一致,代码中是连续 pop 出一个 blob 大小的 sample point,似乎跟Blob模式的“只返回完整 blob”要求不太匹配,后续需要理清这个 API 的设计初衷和实现细节。
dd_get_blob_syncint32_t dd_get_blob_sync(ADC_SAMPLE *pSample, uint32_t job_id, uint32_t *blob_id);
功能:
目前阅读程序逻辑,这个 API 最接近预期的标准 blob 模式:
如果当前 blob 未完整,则读取上一个完整 blob。
如果没有任何完整 blob,则返回失败。
slice 模式强调实时性。
外部程序每次从 ring 中取出已经到达的 sample point。连续读取多个 sample point 或多个 slice 后,可以拼出一个 blob。
读取路径:
dd_get_slice()
-> buf_ring_dequeue_mc()
-> sample[index]
-> memcpy 到用户 buffer
特点:
blob 模式强调完整性。
外部程序读取一个完整 blob,而不是读取正在接收中的半个 blob。
特点:
反射内存模式的目标是提供一个固定大小的数据窗口,让外部程序直接读取共享内存中的 blob 空间。
理想设计:
共享内存中维护一个 blob 大小的数据区
slice 到达后,根据 slice_id 写入固定 offset
如果 offset 空闲,直接写入
如果 offset 对应旧 slice,覆盖旧数据
外部程序直接读取共享内存中的完整 blob 区域
对于用户而言,反射内存模式表现上和slice模式比较类似。
上周组会后主要是写开题报告。报告初审意见没什么问题。
PL端程序还有时钟同步问题没有解决,这些问题应该是在杰哥给的程序里面就已经有了,只是测试的话应该没有问题。
跟中能聚控要了他们现在的上位机程序,按照中能聚控的文档,环境已经配好了,但是给的程序在我们这边执行的时候,存在一些问题。
本文章使用limfx的vscode插件快速发布