讲本章内容之前,首先介绍一下队列,在数据处理和嵌入式开发中,队列是一种核心的先进先出(First in First Out,FIFO)线性数据结构,其核心规则是仅允许在内存的一端执行入队操作、另一端执行出队操作,完美契合 “先到先处理” 的业务需求。队列主要分为普通队列和环形队列两种形式,其中环形队列是对普通队列的优化升级,解决了普通队列的内存浪费和效率问题,也是嵌入式开发中缓冲区设计的主流方案。

1 普通队列

普通队列可将连续的内存空间类比为一个个有序的 “存储单元”,每个单元存放待处理的数据,计算机始终从队列头依次处理数据,处理完成后释放当前单元的内存,新数据则从队列尾依次入队占用新单元。

这种设计存在明显缺陷:已处理数据的内存单元会被直接浪费,若想复用这些空闲单元,需要将队列中所有剩余待处理数据整体向队列头移动一格,而大量数据的循环移动会大幅降低计算机的处理效率,在海量数据交互场景中完全不具备实用性。

以下通过代码直观验证“处理后移动数据”和“处理后仅移指针不移动数据”的效率差异,前者是普通队列的低效实现,后者是对普通队列的初步优化,也为环形队列奠定基础:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>

// Test data size
#define TEST_SIZE 100000

// normal queue structure def
typedef struct
{
    int data[TEST_SIZE];
    int rear; // Only record the tail position, the head is fixed at 0
} MoveQueue;

// Init the queue that moves data after dequeue
void initMoveQueue(MoveQueue *q)
{
    q->rear = 0;
}

// Enqueue operation for MoveQueue
void enqueueMove(MoveQueue *q, int val)
{
    if (q->rear < TEST_SIZE)
    {
        q->data[q->rear++] = val;
    }
}

// Dequeue operation for MoveQueue
// Return value: -1 means the queue is empty, others mean the dequeued data
int dequeueMove(MoveQueue *q)
{
    if (q->rear == 0)
        return -1;              // Queue is empty
    int front_val = q->data[0]; // Get the head data of the queue

    // Core inefficient operation: cycle to move all remaining data forward
    for (int i = 0; i < q->rear - 1; i++)
    {
        q->data[i] = q->data[i + 1];
    }
    q->rear--; // Move the queue tail forward
    return front_val;
}

// normal queue structure def(Only Move Pointer After Dequeue)
typedef struct
{
    int data[TEST_SIZE];
    int front;
    int rear;
} NoMoveQueue;

// Init the queue that only moves pointer after dequeue
void initNoMoveQueue(NoMoveQueue *q)
{
    q->front = 0;
    q->rear = 0;
}

// Enqueue operation for NoMoveQueue
void enqueueNoMove(NoMoveQueue *q, int val)
{
    if (q->rear < TEST_SIZE)
    {
        q->data[q->rear++] = val;
    }
}

// Dequeue operation for NoMoveQueue
// Return value: -1 means the queue is empty, others mean the dequeued data
int dequeueNoMove(NoMoveQueue *q)
{
    if (q->front == q->rear)
        return -1; // Queue is empty

    // Core efficient operation: directly return head data, only move the head pointer
    return q->data[q->front++];
}

/************************ Common Tool: Generate Random Number (Simulate Business Data) ************************/
// Generate random integers from 0 to 999 to simulate pending business data
int getRandVal()
{
    return rand() % 1000;
}

int main()
{
    MoveQueue mq;
    NoMoveQueue nmq;
    clock_t start, end;

    // Timing variables with different precision: seconds for inefficient version, microseconds for efficient version
    double time_move_sec;     // Time consumed by inefficient version (in seconds)
    long long time_nomove_us; // Time consumed by efficient version (in microseconds)
    // Calculate the number of clock cycles per microsecond
    const double CLOCKS_PER_US = (double)CLOCKS_PER_SEC / 1000000.0;

    // 1. Initialize queues and fill with the same test data
    initMoveQueue(&mq);
    initNoMoveQueue(&nmq);
    for (int i = 0; i < TEST_SIZE; i++)
    {
        int val = getRandVal();
        enqueueMove(&mq, val);
        enqueueNoMove(&nmq, val);
    }
    printf("Test data size: %d pending elements\n", TEST_SIZE);
    printf("----------------------------------------\n");

    // 2. Test inefficient version: process all data and count time (in seconds, retain 6 decimal places)
    start = clock();
    while (mq.rear > 0)
    {
        dequeueMove(&mq); // Move data after each dequeue operation
    }
    end = clock();
    time_move_sec = (double)(end - start) / CLOCKS_PER_SEC;

    // 3. Test efficient version: process all data and count time (in microseconds, accurate display)
    start = clock();
    while (nmq.front < nmq.rear)
    {
        dequeueNoMove(&nmq); // Only move pointer, no data movement
    }
    end = clock();
    time_nomove_us = (long long)((double)(end - start) / CLOCKS_PER_US);

    // 4. Optimized output: seconds for inefficient version, microseconds for efficient version
    printf("Inefficient version (move data after dequeue) time: %.6f s\n", time_move_sec);
    printf("Efficient version (only move pointer) time: %lld us\n", time_nomove_us);
    return 0;
}

代码运行结果:

Test data size: 100000 pending elements
----------------------------------------
Inefficient version (move data after dequeue) time: 4.953000 s
Efficient version (only move pointer) time: 0 us

从结果可清晰看出:普通队列因数据移动导致处理效率极低,而仅通过指针偏移的方式,耗时大幅降低,效率提升近 5 万倍。也正是基于 “指针偏移” 的优化思路,环形队列应运而生,既保留了指针偏移的高效性,又解决了普通队列的内存浪费问题。

2 环形队列:内存复用的高效解决方案

计算机中并不存在真正 “环形” 的连续内存,环形队列本质是对普通内存的优化处理方式—— 通过特殊的指针操作,让一段连续的线性内存在数据读写时呈现 “首尾相连” 的环形效果,从而实现已处理数据内存单元的复用,彻底避免内存浪费。

2.1 环形队列核心设计

环形队列的底层仍是数组,核心是两个可灵活移动的指针:

头指针(front):指向缓冲区中下一个可读取 / 出队的数据位置,处理完数据后仅需偏移头指针,即可释放当前内存单元;

尾指针(rear):指向缓冲区中下一个可写入 / 入队的内存位置,存入数据后仅需偏移尾指针,即可指向新的空闲单元。

通过移动头、尾指针即可完成数据的读写操作,当指针到达数组末尾时,会通过特殊逻辑跳回数组起始位置,实现 “环形” 复用;当尾指针绕圈追上头指针时,队列满,需处理完数据(偏移头指针)后,才能继续存入新数据。

2.2 环形队列读写逻辑举例说明

假设环形队列底层数组的起始地址为 0,初始化时头指针和尾指针均指向地址 0
  1. 首个数据入队:存入地址 0 的内存单元,尾指针偏移至地址 1(下一个可写入位置);
  2. 第二个数据入队:存入地址 1 的内存单元,尾指针偏移至地址 2;
  3. 首个数据出队:处理地址 0 的内存单元数据,处理完成后释放该单元,头指针偏移至地址 1(下一个可读取位置);
  4. 后续新数据入队:当尾指针到达数组末尾后,会跳回地址 0,复用已释放的内存单元,实现环形读写。

到此,可能会有读者疑问这种特殊逻辑是什么,再具体展开讲解之前,我们先要解决一件事情,关于环形缓存区判空条件和判满条件的区分:

环形队列的头、尾指针会随读写操作不断偏移且可绕圈,由此带来一个核心问题:
普通的 “头指针 == 尾指针” 既表示队列为空,也表示队列为满,若不做特殊处理,会导致程序无法区分队列状态,引发数据覆盖或读取失败。
针对该问题,嵌入式开发中常用三种解决方案,各有优劣,可根据实际场景选择:

方案

核心逻辑

优点

缺点

牺牲一个单元

满队列:(rear+1)%MAX == front

1. 判空 / 判满仅需简单运算(O (1));

2. 无需额外变量,节省内存;

3. 代码简洁,执行速度快

1. 有效容量减少 1 个(可忽略,比如MAX_SIZE=1024,仅牺牲 1 个,影响极小)

增加状态变量

用is_full布尔变量标记(比如is_full=1表示满,is_full=0表示未满)

有效容量无损失(存满MAX_SIZE个数据)

1. 多占用 1 个字节内存(嵌入式中虽小,但能省则省);2. 入队 / 出队时需额外操作is_full变量(增加 CPU 指令);3. 多线程 / 中断场景下,is_full需加锁保护(复杂度飙升)

计数法(存数据个数)

用count变量记录有效数据个数(count=0为空,count=MAX_SIZE为满)

有效容量无损失,逻辑直观

1. 多占用 1 个整型变量(4 字节,嵌入式中不划算);2. 入队 / 出队时需更新count(增加 CPU 开销);3. 多线程 / 中断场景下,count需原子操作(避免计数错误,复杂度高)

嵌入式开发中首选「牺牲一个单元」方案:以极小的容量损失,换取 “无额外内存占用、无额外 CPU 开销、代码简洁” 的三重优势,完全契合嵌入式开发 “高效、省资源、低复杂度” 的核心需求。

以下实现基于「牺牲一个单元」的判空 / 判满逻辑,代码为嵌入式开发通用写法,包含初始化、判空、判满、入队、出队、有效数据计数、数据打印等完整功能,注释清晰,可直接嵌入项目使用:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

// max size of the ring queue (sacrifice 1 unit for full judge, actual size = MAX_SIZE - 1)
#define MAX_QUEUE_SIZE 8

// ring queue structure def
typedef struct
{
    int data[MAX_QUEUE_SIZE];
    int front;
    int rear;
} RingQueue;

/************************ API ************************/
// init the ring queue: both head and tail pointer point to the start address
void RingQueue_Init(RingQueue *q)
{
    if (q == NULL)
        return;
    memset(q->data, 0, sizeof(q->data));
    q->front = 0;
    q->rear = 0;
}

// judge the ring queue is empty
int RingQueue_IsEmpty(RingQueue *q)
{
    if (q == NULL)
        return 1;
    return (q->front == q->rear);
}

// judge the ring queue is full
int RingQueue_IsFull(RingQueue *q)
{
    if (q == NULL)
        return 1;
    return ((q->rear + 1) % MAX_QUEUE_SIZE == q->front);
}

// calculate the valid number of ring queue
int RingQueue_Count(RingQueue *q)
{
    if (q == NULL)
        return 0;
    return ((q->rear - q->front + MAX_QUEUE_SIZE) % MAX_QUEUE_SIZE);
}

// enqueue operation:
// return value: 0-failed, 1-successful
int RingQueue_Enqueue(RingQueue *q, int val)
{
    if (q == NULL || RingQueue_IsFull(q))
    {
        printf("enqueue failed, the queue is full or null");
        return 0;
    }
    q->data[q->rear] = val;
    q->rear = (q->rear + 1) % MAX_QUEUE_SIZE; // Move tail pointer back, jump to 0 at the end
    printf("Enqueue successful: data %d stored in memory unit[%d]\n", val, (q->rear - 1 + MAX_QUEUE_SIZE) % MAX_QUEUE_SIZE);
    return 1;
}

// enqueue operation:
// return value: -1-failed, others-valid data of queue
int RingQueue_Dequeue(RingQueue *q)
{
    if (q == NULL || RingQueue_IsEmpty(q))
    {
        printf("dequeue failed, the queue is full or null\n");
        return -1;
    }
    int val = q->data[q->front];
    q->front = (q->front + 1) % MAX_QUEUE_SIZE;
    printf("Dequeue successful: process data %d, release memory unit[%d]\n", val, (q->front - 1 + MAX_QUEUE_SIZE) % MAX_QUEUE_SIZE);
    return val;
}

// Print all valid data of the current ring queue (in FIFO order)
void RingQueue_Print(RingQueue *q)
{
    if (q == NULL || RingQueue_IsEmpty(q))
    {
        printf("the queue is full or null\n");
        return;
    }
    printf("Valid data of the current ring queue (FIFO):");
    int i = q->front;
    while (i != q->rear)
    {
        printf("%d ", q->data[i]);
        i = (i + 1) % MAX_QUEUE_SIZE;
    }
    printf("\n");
}

int main()
{
    RingQueue q;
    // 1. 初始化环形队列
    RingQueue_Init(&q);
    printf("Ring queue initialization completed, head/tail pointers all point to address 0, maximum capacity: %d, valid capacity: %d\n",
           MAX_QUEUE_SIZE, MAX_QUEUE_SIZE - 1);
    printf("----------------------------------------\n");

    // 2. Enqueue: store 6 data continuously
    int data[] = {10, 20, 30, 40, 50, 60};
    for (int i = 0; i < sizeof(data) / sizeof(int); i++)
    {
        RingQueue_Enqueue(&q, data[i]);
    }
    printf("Number of current data in the queue: %d\n", RingQueue_Count(&q));
    RingQueue_Print(&q);
    printf("----------------------------------------\n");

    // 3. Dequeue: process the first 3 data (release 3 memory units)
    for (int i = 0; i < 3; i++)
    {
        RingQueue_Dequeue(&q);
    }
    printf("Number of current data in the queue: %d\n", RingQueue_Count(&q));
    RingQueue_Print(&q);
    printf("----------------------------------------\n");

    // 4. Continue enqueuing: store new data (reuse released memory units to realize ring reuse)
    RingQueue_Enqueue(&q, 70);
    RingQueue_Enqueue(&q, 80);
    RingQueue_Enqueue(&q, 90);
    printf("Number of current data in the queue: %d\n", RingQueue_Count(&q));
    RingQueue_Print(&q);
    printf("----------------------------------------\n");

    // 5. Try to enqueue: queue is full, enqueue failed (verify full judgment logic)
    RingQueue_Enqueue(&q, 100);
    printf("----------------------------------------\n");

    // 6. Dequeue: process all remaining data (verify empty judgment logic)
    while (!RingQueue_IsEmpty(&q))
    {
        RingQueue_Dequeue(&q);
    }
    RingQueue_Print(&q);
    // Try to dequeue from empty queue, verify failure logic
    RingQueue_Dequeue(&q);

    return 0;
}

3 stm32串口接收实战

stm32串口接收是被动的,外部数据什么时候到来,单次来多少是不可控的,如果直接在中断里面处理数据会导致:
  1. 中断执行时间过长、阻塞高优先级中断(嵌入式大忌);
  2. 新数据到来时,未处理完的旧数据被硬件覆盖,造成数据丢失;
  3. 硬件层和应用层耦合严重,系统响应变慢。
基于此,我们以stm32 hal库为基础,编写伪代码,供您参考:
extern UART_HandleTypeDef huart1; // 外部引用串口1句柄

UartRxRingBuf UART1_RxRingBuf; // 串口1接收环形缓冲区

/************************ 环形缓冲区核心函数实现 ************************/
void UART_RxRingBuf_Init(UartRxRingBuf *q)
{
    if (q == NULL)
        return;
    memset(q->data, 0, sizeof(q->data));
    q->front = 0;
    q->rear = 0;
}

// 判断缓冲区是否为空:1-空,0-非空
uint8_t UART_RxRingBuf_IsEmpty(UartRxRingBuf *q)
{
    if (q == NULL)
        return 1;
    return (q->front == q->rear) ? 1 : 0;
}

// 判断缓冲区是否为满:1-满,0-非满(牺牲1个单元,避免空/满歧义)
uint8_t UART_RxRingBuf_IsFull(UartRxRingBuf *q)
{
    if (q == NULL)
        return 1;
    return ((q->rear + 1) % UART_RX_BUF_SIZE == q->front) ? 1 : 0;
}

// 获取缓冲区当前有效字节数
uint16_t UART_RxRingBuf_GetCount(UartRxRingBuf *q)
{
    if (q == NULL)
        return 0;
    return (q->rear - q->front + UART_RX_BUF_SIZE) % UART_RX_BUF_SIZE;
}

uint8_t UART_RxRingBuf_Enqueue(UartRxRingBuf *q, UART_DATA_TYPE val)
{
    if (q == NULL || UART_RxRingBuf_IsFull(q))
        return 0;                               
    q->data[q->rear] = val;                     
    q->rear = (q->rear + 1) % UART_RX_BUF_SIZE; 
    return 1;
}

// 出队操作:从缓冲区取出1字节数据,在主循环中调用!
int32_t UART_RxRingBuf_Dequeue(UartRxRingBuf *q)
{
    if (q == NULL || UART_RxRingBuf_IsEmpty(q))
        return -1;                                
    UART_DATA_TYPE val = q->data[q->front];       
    q->front = (q->front + 1) % UART_RX_BUF_SIZE; 
    return (int32_t)val;

void USART1_IRQHandler(void)
{
    uint8_t rx_byte; // 存储接收的1个字节
    // 判断:是否是接收非空中断(RXNE)
    if (__HAL_UART_GET_FLAG(&huart1, UART_FLAG_RXNE) != RESET)
    {
        rx_byte = (uint8_t)USART1->DR; // 读取串口数据寄存器,获取1字节(清RXNE标志位)
        // 核心:将接收字节入队到环形缓冲区(中断中唯一的核心操作)
        UART_RxRingBuf_Enqueue(&UART1_RxRingBuf, rx_byte);
    }

    // 其他中断标志位处理(如溢出错误,可选添加)
    if (__HAL_UART_GET_FLAG(&huart1, UART_FLAG_ORE) != RESET)
    {
        __HAL_UART_CLEAR_FLAG(&huart1, UART_FLAG_ORE); // 清除溢出错误标志
        (void)USART1->DR;                              // 读DR寄存器,彻底清标志
    }
}

以上就是本期内容了,如果有不恰当的地方,望您海涵,同时,欢迎提出您宝贵的建议!

Logo

智能硬件社区聚焦AI智能硬件技术生态,汇聚嵌入式AI、物联网硬件开发者,打造交流分享平台,同步全国赛事资讯、开展 OPC 核心人才招募,助力技术落地与开发者成长。

更多推荐