一、一维指针“隔山打牛”式的操作
#include <stdio.h>int main()
{
#if 0char a = 'k';char b = 'm';char *c = &a;printf("%c\n", *c);char **d;d = &c;*d = &b; //通过二维指针d来间接修改c的指向printf("%c\n", *c);
// $ gcc t4.c ; ./a.out
// k
// m
#elif 1char a = 'k';char b = 'm';char *c = &a;printf("%c\n", *c);void *d;// 64位系统下 d变量连续占用了8个字节 保存了c的地址d = &c;//通过一维指针d来间接修改c的指向 , d本身就是c的地址*(char**)d = &b;printf("%c\n", *c);
// $ gcc t4.c ; ./a.out
// k
// m
#endif
}
上面代码展示了两种 对一维指针c的修改方法。
第二种方法是非常规操作,本身d是一维指针,但是使用时又当二维指针来用,达到了间接修改变量c的指向的效果。这个是关键:
1. d具有一维指针,操作方便,对于需要基于d做偏移操作的情况,很方便。
2. d可以当做二维指针来修改指向的一维指针的值。 这个做法在我之前的文章“内存池-不定长实现”中有大量使用。省去了代码中大量的赋值转换操作。
二、队列--用户数据的分离式实现
要理解workflow的的队列实现,要先理解队列用户数据分离式实现的思想。
即 队列操作和 用户节点的结构体类型进行分离,用户节点的结构体对于队列的所有操作函数都不可见。
类似于C++的模板思想。好处就是队列的一整套操作函数,适用于任何封装的用户队列节点。
队列的操作函数不关心 用户数据的结构体封装,只需要里面有个next指针可以保存下个节点地址就行。同时需要知道这个指针相对于结构体的偏移。
#include <stdio.h>
#include <stdlib.h>
#include <stddef.h>
#define LINK_OP// 定义队列节点
typedef struct Node {int data; // 数据部分struct Node *next; // 指向下一个节点的指针
} Node;// 定义队列
typedef struct Queue {void *front; // 队头指针void *rear; // 队尾指针
} Queue;// 创建一个新的队列
Queue *create_queue() {Queue *q = (Queue *)malloc(sizeof(Queue));if (!q) {perror("Failed to create queue");return NULL;}q->front =NULL;q->rear = NULL; // 初始化队列为空return q;
}// 入队操作
void enqueue(Queue *q, void *node) {int offset = offsetof(Node, next);void *next = (char *)node + offset;*(void**)next = NULL; // 新节点是队尾,指向NULLif (q->rear == NULL) {// 如果队列为空,队头和队尾都指向新节点q->front = q->rear = next;return;}*(void**)q->rear = next; // 把当前队尾的next指向新节点q->rear = next; // 更新队尾为新节点
}// 出队操作
void * dequeue(Queue *q) {int offset = offsetof(Node, next);void *node = q->front;if (node == NULL) {printf("Queue is empty\n");return NULL; // 队列为空,返回NULL}q->front = *(void **)node; // 队头指针移到下一个节点if (q->front == NULL) {// 如果队列变为空,队尾指针也要设置为NULLq->rear = NULL;}return node - offset; // 返回出队的值
}// 查看队列头部的元素
void* peek(Queue *q) {if (q->front == NULL) {printf("Queue is empty\n");return NULL; // 队列为空,返回NULL}void *node = q->front;int offset = offsetof(Node, next);return node - offset;
}// 销毁队列
void destroy_queue(Queue *q) {while (peek(q) != NULL) {void *node = dequeue(q); // 一一出队并释放内存free(node);}free(q); // 释放队列本身
}int main() {Queue *q = create_queue(); // 创建队列// 入队操作int i;for (i = 1; i < 5; i++) {Node *new_node = (Node *)malloc(sizeof(Node));if (!new_node) {perror("Failed to enqueue");return EXIT_FAILURE;}new_node->data = i * 10;enqueue(q, new_node);}// 查看队列头部元素printf("Front element is: %d\n", peek(q) == NULL ? -1 : ((Node *)peek(q))->data);// 出队操作Node *node = (Node *)dequeue(q);if (node != NULL) {printf("Dequeued: %d\n", node->data);free(node);}node = (Node *)dequeue(q);if (node != NULL) {printf("Dequeued: %d\n", node->data);free(node);}// 再次查看队列头部元素printf("Front element is: %d\n", peek(q) == NULL ? -1 : ((Node *)peek(q))->data);// 出队剩余元素node = (Node *)dequeue(q);if (node != NULL) {printf("Dequeued: %d\n", node->data);free(node);}node = (Node *)dequeue(q);if (node != NULL) {printf("Dequeued: %d\n", node->data);free(node);}for (i = 1; i < 5; i++) {Node *new_node = (Node *)malloc(sizeof(Node));if (!new_node) {perror("Failed to enqueue");return EXIT_FAILURE;}new_node->data = i * 11;enqueue(q, new_node);}// 查看队列头部元素printf("Front element is: %d\n", peek(q) == NULL ? -1 : ((Node *)peek(q))->data);// 出队操作node = (Node *)dequeue(q);if (node != NULL) {printf("Dequeued: %d\n", node->data);free(node);}node = (Node *)dequeue(q);if (node != NULL) {printf("Dequeued: %d\n", node->data);free(node);}// 销毁队列destroy_queue(q);return 0;
}
执行结果:
$ ./a.out
Front element is: 10
Dequeued: 10
Dequeued: 20
Front element is: 30
Dequeued: 30
Dequeued: 40
Front element is: 11
Dequeued: 11
Dequeued: 22
Queue is empty
代码中用户数据struct Node对队列的操作函数是不可见的。
三、sogou的workflow 队列 源码
源码
msgqueue.c
/*Copyright (c) 2020 Sogou, Inc.Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License.Author: Xie Han (xiehan@sogou-inc.com)
*//** This message queue originates from the project of Sogou C++ Workflow:* https://github.com/sogou/workflow** The idea of this implementation is quite simple and obvious. When the* get_list is not empty, the consumer takes a message. Otherwise the consumer* waits till put_list is not empty, and swap two lists. This method performs* well when the queue is very busy, and the number of consumers is big.*/#include <errno.h>
#include <stdlib.h>
#include <pthread.h>
#include "msgqueue.h"struct __msgqueue
{size_t msg_max;size_t msg_cnt;int linkoff;int nonblock;void *head1;void *head2;void **get_head;void **put_head;void **put_tail;pthread_mutex_t get_mutex;pthread_mutex_t put_mutex;pthread_cond_t get_cond;pthread_cond_t put_cond;
};void msgqueue_set_nonblock(msgqueue_t *queue)
{queue->nonblock = 1;pthread_mutex_lock(&queue->put_mutex);pthread_cond_signal(&queue->get_cond);pthread_cond_broadcast(&queue->put_cond);pthread_mutex_unlock(&queue->put_mutex);
}void msgqueue_set_block(msgqueue_t *queue)
{queue->nonblock = 0;
}static size_t __msgqueue_swap(msgqueue_t *queue)
{void **get_head = queue->get_head;size_t cnt;queue->get_head = queue->put_head;pthread_mutex_lock(&queue->put_mutex);while (queue->msg_cnt == 0 && !queue->nonblock)pthread_cond_wait(&queue->get_cond, &queue->put_mutex);cnt = queue->msg_cnt;if (cnt > queue->msg_max - 1)pthread_cond_broadcast(&queue->put_cond);queue->put_head = get_head;queue->put_tail = get_head;queue->msg_cnt = 0;pthread_mutex_unlock(&queue->put_mutex);return cnt;
}void msgqueue_put(void *msg, msgqueue_t *queue)
{void **link = (void **)((char *)msg + queue->linkoff);*link = NULL;pthread_mutex_lock(&queue->put_mutex);while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock)pthread_cond_wait(&queue->put_cond, &queue->put_mutex);*queue->put_tail = link;queue->put_tail = link;queue->msg_cnt++;pthread_mutex_unlock(&queue->put_mutex);pthread_cond_signal(&queue->get_cond);
}void *msgqueue_get(msgqueue_t *queue)
{void *msg;pthread_mutex_lock(&queue->get_mutex);if (*queue->get_head || __msgqueue_swap(queue) > 0){msg = (char *)*queue->get_head - queue->linkoff;*queue->get_head = *(void **)*queue->get_head;}elsemsg = NULL;pthread_mutex_unlock(&queue->get_mutex);return msg;
}msgqueue_t *msgqueue_create(size_t maxlen, int linkoff)
{msgqueue_t *queue = (msgqueue_t *)malloc(sizeof (msgqueue_t));int ret;if (!queue)return NULL;ret = pthread_mutex_init(&queue->get_mutex, NULL);if (ret == 0){ret = pthread_mutex_init(&queue->put_mutex, NULL);if (ret == 0){ret = pthread_cond_init(&queue->get_cond, NULL);if (ret == 0){ret = pthread_cond_init(&queue->put_cond, NULL);if (ret == 0){queue->msg_max = maxlen;queue->linkoff = linkoff;queue->head1 = NULL;queue->head2 = NULL;queue->get_head = &queue->head1;queue->put_head = &queue->head2;queue->put_tail = &queue->head2;queue->msg_cnt = 0;queue->nonblock = 0;return queue;}pthread_cond_destroy(&queue->get_cond);}pthread_mutex_destroy(&queue->put_mutex);}pthread_mutex_destroy(&queue->get_mutex);}errno = ret;free(queue);return NULL;
}void msgqueue_destroy(msgqueue_t *queue)
{pthread_cond_destroy(&queue->put_cond);pthread_cond_destroy(&queue->get_cond);pthread_mutex_destroy(&queue->put_mutex);pthread_mutex_destroy(&queue->get_mutex);free(queue);
}
msgqueue.h
/*Copyright (c) 2020 Sogou, Inc.Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License.Author: Xie Han (xiehan@sogou-inc.com)
*/#ifndef _MSGQUEUE_H_
#define _MSGQUEUE_H_#include <stddef.h>typedef struct __msgqueue msgqueue_t;#ifdef __cplusplus
extern "C"
{
#endif/* A simple implementation of message queue. The max pending messages may* reach two times 'maxlen' when the queue is in blocking mode, and infinite* in nonblocking mode. 'linkoff' is the offset from the head of each message,* where spaces of one pointer size should be available for internal usage.* 'linkoff' can be positive or negative or zero. */msgqueue_t *msgqueue_create(size_t maxlen, int linkoff);
void msgqueue_put(void *msg, msgqueue_t *queue);
void *msgqueue_get(msgqueue_t *queue);
void msgqueue_set_nonblock(msgqueue_t *queue);
void msgqueue_set_block(msgqueue_t *queue);
void msgqueue_destroy(msgqueue_t *queue);#ifdef __cplusplus
}
#endif#endif
关键点:
1. 用户的数据结构体--需要在用户层自定义,里面必须要包含一个指针变量来保存下一个节点地址。这个变量相对于结构体的地址偏移,以linkoff的形参在队列创建函数中传入。
2.linkoff的计算--用系统宏
typedef struct _message_t{int id;struct _message_t *next;
} message_t;#include <stddef.h>
int linkoff = offsetof(message_t, next);
3. 代码中采用两条链表来管理,对于上层用户感知不到这一点。两条队列 : 一条保存入队列的任务,另一条队列用来出队列执行任务(结点)。也就是说一条队列只有enqueue操作,另一条只有dequeue操作。
4. 两条队列分别有互斥锁-条件变量来保证线程安全。大量情况下入队的线程之间会产生竞争,出队的线程之间会产生竞争。只有在“出队任务”为空下,才会和入队任务线程产生竞争。
当出队的队列为空时,且入队的结点数量达到一定时,则交换两条队列的头尾指针。(相当于是队列交换,即函数:__msgqueue_swap())。
5. 管理结构体本质是*head1 和 *head2来管理的,**put_head和**get_head只是为了代码编写方便。完全可以改为,有兴趣的可以自己实现。
struct __msgqueue
{size_t msg_max;size_t msg_cnt;int linkoff;int nonblock;void *get_head;void *put_head;void *put_tail;pthread_mutex_t get_mutex;pthread_mutex_t put_mutex;pthread_cond_t get_cond;pthread_cond_t put_cond;
};
测试代码
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include "msgqueue.h"#define MAX_MSGS 10 // 最大消息数量
#define NUM_PRODUCERS 3 // 生产者线程数量
#define NUM_CONSUMERS 2 // 消费者线程数量
#define MAX_MSG_SIZE 100 // 消息大小// 消息结构
typedef struct _message_t{int id;struct _message_t *next;
} message_t;// 生产者线程函数
void *producer(void *arg) {msgqueue_t *queue = (msgqueue_t *)arg;message_t *msg;static int id = 0;while (1) {msg = (message_t *)malloc(sizeof(message_t));if (!msg) {perror("malloc failed");return NULL;}msg->id = id++;printf("Producer: Producing message %d\n", msg->id);msgqueue_put(msg, queue);usleep(rand() % 1000000); // 随机等待,模拟生产者的工作时间}return NULL;
}// 消费者线程函数
void *consumer(void *arg) {msgqueue_t *queue = (msgqueue_t *)arg;message_t *msg;while (1) {msg = (message_t *)msgqueue_get(queue);if (msg) {printf("Consumer: Consuming message %d\n", msg->id);free(msg); // 处理完消息后释放内存} else {printf("Consumer: No message to consume, waiting...\n");usleep(100000); // 没有消息时等待一段时间}}return NULL;
}int main() {msgqueue_t *queue;pthread_t producers[NUM_PRODUCERS], consumers[NUM_CONSUMERS];int i;// 创建消息队列queue = msgqueue_create(MAX_MSGS, offsetof(message_t, next));if (!queue) {perror("Failed to create message queue");return EXIT_FAILURE;}// 启动生产者线程for (i = 0; i < NUM_PRODUCERS; i++) {if (pthread_create(&producers[i], NULL, producer, (void *)queue) != 0) {perror("Failed to create producer thread");return EXIT_FAILURE;}}// 启动消费者线程for (i = 0; i < NUM_CONSUMERS; i++) {if (pthread_create(&consumers[i], NULL, consumer, (void *)queue) != 0) {perror("Failed to create consumer thread");return EXIT_FAILURE;}}// 等待线程结束(这里只是示范,实际情况应该有更好的线程管理机制)for (i = 0; i < NUM_PRODUCERS; i++) {pthread_join(producers[i], NULL);}for (i = 0; i < NUM_CONSUMERS; i++) {pthread_join(consumers[i], NULL);}// 销毁消息队列msgqueue_destroy(queue);return EXIT_SUCCESS;
}
0voice · GitHub