乐愚社区Beta

 编程语言  >  这位创造Github冠军项目的老男人,堪称10倍程序员本尊

这位创造Github冠军项目的老男人,堪称10倍程序员本尊

饲养员  L0  • 2019-07-29 • 回复 5 • 只看楼主举报    


7月12日一款叫做TDengine的时序数据库项目在Github上开源了,这个项目一经发布就稳稳占据了Github排行榜的C位,目前TdEngine已经累积了5000多个star,并且连续一周排在上升榜首位。而且你要知道TdEngine的开发语言并不是火热的Python或JAVA,而是C语言。C语言无巧可取,虽见功夫,但是代码比较难读,能引发如此的关注绝对堪称奇迹,在我印象中即使是Mysql也没有达到如此的热度。

相信很多人也和笔者一样,是通过《比hadoop快至少10 倍的物联网大数据平台,我把它开源》的刷屏文才了解到陶老师与TdEngine的,当看到这位50岁的IT老兵老兵,依旧奋斗在编程一线,为TDengine开发贡献3万行代码时候,我就立刻四处向朋友打听,并最终要了陶老师的微信,做为一名80后程序员,我近不急待的想和陶老师直接沟通,想从他身上找到保持编程水平的秘决。

大神面对面-这才是10倍程序员该有的样子

2008年的时候笔者还是CSDN论坛WINDOWS MOBILE版的版主,从事手机导航软件的开发工作,而在彼时陶老师也创办了和信公司,并亲自开发了WindowsMobile版和信客户端,相同的开发平台经历也让我们迅速的拉进了彼此的距离。

在使用TdEngine的过程中我发现了两个小问题,一是数据库用户密码明文存放,二是数据文件权限设置不合理。让我十分震惊的是,这两个问题是我下午在和陶老师聊天时提出的,当晚发布版本就把问题全部解决了。后来沟通得知这些BUG都是陶老师自己动手修改的。我意识到TdEngine的效率应该来自于创始人对于代码的执着与热爱,而不是对员工996式的工作要求。

陶老师是真的爱编程,尤其对于代码运行效率有着近乎狂热的追求,我查阅了陶老师近年来的作品,其和信客户端只有18K大小,胎心算法的实现只用了600行代码,而TDengine这样一个数据库项目竟然只需要1.5M安装包就能搞定,在手机APP都动辙上百M的今天,TDengine体量甚至显得有些异类。如果没有深厚的功底和坚定的信念是绝对无法达到如此高度的。我想陶老师应该就是传说中10倍程序员的典范吧。

10倍程序员对于他周围亲友的影响也是非常巨大的,当我打开TdEngine的官网,其简洁明快的风格,一目了然的配图,实在让我无法把这一切和一位年近半百的老派IT人士联系到一起,当然后来我和陶老师聊到这件事的时候才知道,整个网站从设计、前端、后台、浏览器适配、数据分析到搜索引擎优化,都是由陶老师的儿子,一位刚刚高中毕业的00后操刀主持的,而且整个网站从无到有只用了三周时间,除了感叹一句后生可畏,由此也可以看出来和10倍程序员并肩作战的也都是10倍程序员,所以it团队的负责人在感叹自己没有18程序员相助时也要反思一下,自己是不是一位10程序员。

TdEngine为什么会火

传统数据库厂商的问题在于傲慢、自大,他们认为数据是零件,数据库则是各类零件的加中心,很多工序都是为数据的修改准备的,无论修改是否发生加工车间为了保证一致性,都会对流水线上的数据加上各种各样的锁。这些操作浪费了很多时间,而且几乎没有任何轻量级的框架,可供用户选择省略掉这些冗余操作。而且传统厂商为了解决数据库的性能问题不是从底层架构逻辑下手,而是不休止的在应用与数据库之间加入各种像REDIS,NGIX等等代理或者缓存层,这种方式其实是加大了各层级间的性能开销。传统厂商认为自己非常了解数据,但却忘了用户比厂商更加了解自己的数据,天下可谓苦秦久已。

而TdEngine是认为数据是信息流,它要做的非常简单,只是数据的录像机而已,信息调阅只要找到对应的录像带即可,这样的设计思路从底层逻辑上决定了td会是一款性能极高的产品。它更加贴合物联网时代的数据模型,而且代码只有10万行的量级,非常适合从从头开始学习。

所以TdEngine精确的找到了数据库市场的细分战场。他可以在相同的硬件条件下达到其它产品10倍的速度,完美解决了很多物联网,量化交易等场景的痛点。

TdEngine代码导读

当笔者打TdEngine的代码时不由眼前一亮,其代码风格及规范性绝对堪称一流,于是我打开了久违的souce insight,,再一次开始了阅读C语言代码的美妙旅程,在这里强烈推荐各位读者也来读一下,绝对堪称享受。

这里将给我启示最大的一段代码其链接在https://github.com/taosdata/TDengine/blob/master/src/util/src/tsched.c,向大家分享一下。鉴于本文肯定会分享给陶老师,所以估计会有作者亲答的环节:-),以下代码是一个典型的consumer-producer消息传递功能的实现,也就是有多个生产者(producer)生成并不断向队列中传递消息,也有多个消费者(consumer)不断从队列中取消息,而在java等高级语言中类似的功能已经被封装好了,这其实也让程序员无法了解线程间的同步和互斥机制。在正式进入到代码之前我想请大家思考这样的一个,互斥体( mutex)和信号量(semaphore)的使用是如何做到多线程安全的。

先来看结构体设计,具体我已经注释好了:

typedef struct {  char            label[16];//消息内容  sem_t           emptySem;//此信号量代表队列的可写状态  sem_t           fullSem;//此信号量代表队列的可读状态  pthread_mutex_t queueMutex;//此互斥体为保证消息不会被误修改,保证线程程安全  int             fullSlot;//队尾位置  int             emptySlot;//队头位置  int             queueSize;#队列长度  int             numOfThreads;//同时操作的线程数量  pthread_t *     qthread;//线程指针  SSchedMsg *     queue;//队列指针} SSchedQueue;

再来看初始化函数,这里需要特别说明的是,两个信号量的创建,其中emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度,fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读。具体代码及我的注释如下:

void *taosInitScheduler(int queueSize, int numOfThreads, char *label) {  pthread_attr_t attr;  SSchedQueue *  pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));  memset(pSched, 0, sizeof(SSchedQueue));  pSched->queueSize = queueSize;  pSched->numOfThreads = numOfThreads;  strcpy(pSched->label, label);  if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {    pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno));    goto _error;  }   //emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度。  if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {    pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno));    goto _error;  } //fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读  if (sem_init(&pSched->fullSem, 0, 0) != 0) {    pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno));    goto _error;  }  if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) {    pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno));    goto _error;  }  memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg));  pSched->fullSlot = 0;//实始化时队列为空,故队头和队尾的位置都是0  pSched->emptySlot = 0;//实始化时队列为空,故队头和队尾的位置都是0  pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads);  pthread_attr_init(&attr);  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);  for (int i = 0; i < pSched->numOfThreads; ++i) {    if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {      pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));      goto _error;    }  }  pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);  return (void *)pSched; _error:  taosCleanUpScheduler(pSched);  return NULL;}

再来看读消息的taosProcessSchedQueue函数,这个主要逻辑是:

使用无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续向下处理

在操作msg前,加入互斥体防止msg被误用。

读操作完毕后修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。同时退出互斥体。

对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6

具体代码及注释如下:

void *taosProcessSchedQueue(void *param) {  SSchedMsg    msg;  SSchedQueue *pSched = (SSchedQueue *)param; //注意这里是个无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续处理  while (1) {    if (sem_wait(&pSched->fullSem) != 0) {      pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));      if (errno == EINTR) {        /* sem_wait is interrupted by interrupt, ignore and continue */        continue;      }    }     //加入互斥体防止msg被误用。    if (pthread_mutex_lock(&pSched->queueMutex) != 0)      pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));    msg = pSched->queue[pSched->fullSlot];    memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));    //读取完毕修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。    pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;     //读取完毕修改退出互斥体    if (pthread_mutex_unlock(&pSched->queueMutex) != 0)      pError("unlock %s queueMutex failed, reason:%s\n", pSched->label, strerror(errno));     //读取完毕对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6    if (sem_post(&pSched->emptySem) != 0)      pError("post %s emptySem failed, reason:%s\n", pSched->label, strerror(errno));    if (msg.fp)      (*(msg.fp))(&msg);    else if (msg.tfp)      (*(msg.tfp))(msg.ahandle, msg.thandle);  }}

最后来看写消息的taosScheduleTask函数,其基本逻辑是:

写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。

加入互斥体防止msg被误操作,写入完成后退出互斥体

写队列完成后对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取taosProcessSchedQueue中sem_wait(&pSched->fullSem)不再阻塞就继续向下。

int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {  SSchedQueue *pSched = (SSchedQueue *)qhandle;  if (pSched == NULL) {    pError("sched is not ready, msg:%p is dropped", pMsg);    return 0;  }  #在写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。  if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));#加入互斥体防止msg被误操作  if (pthread_mutex_lock(&pSched->queueMutex) != 0)    pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));  pSched->queue[pSched->emptySlot] = *pMsg;  pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;  if (pthread_mutex_unlock(&pSched->queueMutex) != 0)    pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));  #在写队列前先对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取函数可以进行处理。  if (sem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno));  return 0;}

当然以上只是TdEngine优美代码的一小部分,而且笔者解读的功力也十分有限,这里再次强烈建议大家下载全部源码仔细学习,定能受益匪浅。


5条回帖
136178  L0  评论于
(0)  回复(4) 1#
sublime text汉化破解版,有代码自动提示插件的,老哥有吗?
饲养员 楼主 :这个没有
发表在2019-07-29 回复
发表在2019-07-30 回复
136178 :回复 Walker感谢大佬! 这个问题和我之前的一样 代码自动提示插件 安装不上 老是报错
发表在2019-07-30 回复
136178 :回复 Walker:已解决,??下载插件网站的被墙了,等个vpn,就可以下了
发表在2019-07-30 回复
  
:)
还没注册帐号?快来注册社区帐号,和我们一起嗨起来!
关于本社区

集各类兴趣爱好于一身的轻量化交流社区,在此您可以和他人一起分享交流您觉得有价值的内容,社区鼓励大家发表原创内容,为社区添砖加瓦!

发帖奖励 → 社区版规 → 招聘版主 →
推荐版块
扫描二维码下载社区APP
回到顶部