前言
流量控制可以讓發送端根據接收端的實際接受能力控制發送的數據量。它的具體操作是,接收端主機向發送端主機通知自己可以接收數據的大小,于是發送端會發送不會超過該大小的數據,該限制大小即為窗口大小,即窗口大小由接收端主機決定。如播放視頻,音頻文件時,需要對發送的數據進行流控。
mycat實現
#include <stdlib.h>
#include <stdio.h>
#include <error.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>#define BUF_SIZE 1024*1
#define BUF_SIZE 100
int main(int argc,char **argv)
{int fps,fpd=1;int ret;char buf[BUF_SIZE];int len,pos;if(argc <2){fprintf(stderr,"Usage:%s <src_file> <dest_file>\n",argv[0]);exit(1);}do{fps = open(argv[1],O_RDONLY);if(fps <0){if(errno != EINTR){perror("open");exit(1);}}}while(fps<0);while(1){len = read(fps,buf,BUF_SIZE);if(len < 0){if(errno == EINTR)continue;perror("read()");break;}if(len ==0)break;pos = 0;while(len > 0){ret = write(fpd,buf+pos,len);sleep(1);if(ret <0){if(errno == EINTR)continue;perror("write()");exit(1);}pos += ret;len-=ret;}}close(fps);return 0;
}
漏桶實現
#include <stdlib.h>
#include <stdio.h>
#include <error.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>#define CPS 100
#define BUF_SIZE CPSstatic volatile int loop = 0;static void alrm_handler(int num)
{alarm(1);loop = 1;
}int main(int argc,char **argv)
{int fps,fpd=1;int ret;char buf[BUF_SIZE];int len,pos;if(argc <2){fprintf(stderr,"Usage:%s \n",argv[0]);exit(1);}signal(SIGALRM,alrm_handler);alarm(1);do{fps = open(argv[1],O_RDONLY);if(fps <0){if(errno != EINTR){perror("open");exit(1);}}}while(fps<0);while(1){//while(!loop);while(loop == 0)pause(); //用于等待一個打斷的到來。不加則是忙等。loop = 0;while(1){len = read(fps,buf,BUF_SIZE);if(len < 0){if(errno == EINTR)continue;perror("read()");break;}break;}if(len ==0)break;pos = 0;while(len > 0){ret = write(fpd,buf+pos,len);if(ret <0){if(errno == EINTR)continue;perror("write()");exit(1);}pos += ret;len-=ret;}}close(fps);return 0;
}
如果讀取的是打印機類的設備,并且當時打印機上面沒有數據,那么程序就會一直循環于 read處
所以 漏桶的缺陷就是 如果沒有數據的時候 就會一直循環等待,直到有數據,如果忽然來的數據量很大,也不能快速的去讀數據,只能慢慢的一秒10個字節的去讀n次
令牌桶實現
令牌桶的優勢,就是 當沒有數據可讀的時候,會積攢自己的權限,意思是 如果之前30秒一直沒有數據,讀空了30秒,那么就存下30個權限,等到有數據的時候,快速使用前面30個權限,快速連續讀30次。
#include <stdlib.h>
#include <stdio.h>
#include <error.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>#define CPS 10
#define BUF_SIZE CPS
#define BURST 100static volatile sig_atomic_t token = 0; //信號原子類型,保證取值和賦值一定是一條指令去運行的static void alrm_handler(int num)
{alarm(1);token++;if(token >= BURST)token =BURST;
}int main(int argc,char **argv)
{int fps,fpd=1;int ret;char buf[BUF_SIZE];int len,pos;if(argc <2){fprintf(stderr,"Usage:%s \n",argv[0]);exit(1);}signal(SIGALRM,alrm_handler);alarm(1);do{fps = open(argv[1],O_RDONLY);if(fps <0){if(errno != EINTR){perror("open");exit(1);}}}while(fps<0);while(1){while(token <= 0)pause(); //用于等待一個打斷的到來。不加則是忙等。token--; // 操作不原子,可以出現--的時候出現++的情況while(1){len = read(fps,buf,BUF_SIZE); //讀的時候被信號打斷時token會++if(len < 0){if(errno == EINTR)continue;perror("read()");break;}break;}if(len ==0)break;pos = 0;while(len > 0){ret = write(fpd,buf+pos,len);if(ret <0){if(errno == EINTR)continue;perror("write()");exit(1);}pos += ret;len-=ret;}}close(fps);return 0;
}
令牌桶實現封裝
main.c
#include <stdlib.h>
#include <stdio.h>
#include <error.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include "mytbf.h"#define CPS 10
#define BUF_SIZE 1024
#define BURST 100int main(int argc,char **argv)
{int fps,fpd=1;int ret;char buf[BUF_SIZE];int len,pos;if(argc <2){fprintf(stderr,"Usage:%s \n",argv[0]);exit(1);}mytbf_t* tbf;tbf = mytbf_init(CPS,BURST);if(tbf ==NULL){fprintf(stderr,"mytbf_init Error\n");exit(1);}do{fps = open(argv[1],O_RDONLY);if(fps <0){if(errno != EINTR){perror("open");exit(1);}}}while(fps<0);while(1){int size;size = mytbf_fetchtoken(tbf,BUF_SIZE);if(size <0){fprintf(stderr,"mytbf_fetchtoken:%s\n",strerror(-size));exit(1);}while(1){len = read(fps,buf,size); //讀的時候被信號打斷時token會++if(len < 0){if(errno == EINTR)continue;perror("read()");break;}break;}if(len ==0)break;if(size -len > 0) //如果讀到的token數比取出來的token數小,就把沒用到的token歸還。{mytbf_returntoken(tbf,size -len);}pos = 0;while(len > 0){ret = write(fpd,buf+pos,len);if(ret <0){if(errno == EINTR)continue;perror("write()");exit(1);}pos += ret;len-=ret;}}close(fps);mytbf_destroy(tbf);return 0;
}
mytbf.h
#ifndef MYTBF_H
#define MYTBF_H#define MYTBF_MAX 1024typedef void mytbf_t;mytbf_t* mytbf_init(int cps ,int burst); //C語言中,void*可以賦值給任何類型的指針,任何類型的指針也都可以賦值給void*int mytbf_fetchtoken(mytbf_t*,int); //獲取tokenint mytbf_returntoken(mytbf_t*,int); //返還tokenint mytbf_destroy(mytbf_t*);#endif
mytbf.c
#include <stdlib.h>
#include <stdio.h>
#include <error.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>#include "mytbf.h"struct mytbf_st
{int cps;int burst;int token;int pos;};
static struct mytbf_st* job[MYTBF_MAX];
static int inited = 0;typedef void (*sighandler_t)(int);static sighandler_t alrm_handler_save;static int get_free_pos(void)
{for(int i=0;i< MYTBF_MAX;i++){if(job[i]==NULL)return i;}return -1;
}static void alrm_handler(int num)
{alarm(1);for(int i=0;i<MYTBF_MAX;i++){if(job[i] != NULL){job[i]->token += job[i]->cps;if(job[i]->token >job[i]->burst ){job[i]->token = job[i]->burst;}}}}static void module_unload()
{signal(SIGALRM,alrm_handler_save ); //關閉alarm注冊的行為,還原之前的行為alarm(0); //關閉時鐘信號for(int i=0;i<MYTBF_MAX;i++)free(job[i]);
}static void module_load()
{alrm_handler_save = signal(SIGALRM,alrm_handler); //定義新的行為,保存舊的行為alarm(1);atexit(module_unload);
}mytbf_t* mytbf_init(int cps ,int burst) //C語言中,void*可以賦值給任何類型的指針,任何類型的指針也都可以賦值給void*
{struct mytbf_st*me;int pos;if(inited == 0){module_load();inited = 1;}pos = get_free_pos();if(pos < 0)return NULL;me = malloc(sizeof(*me));if(me == NULL)return NULL;me->cps = cps;me->burst = burst;me->token = 0;me->pos = pos;job[pos] = me;return me;}
static int min(int token,int size)
{if(token> size)return size;return token;
}
int mytbf_fetchtoken(mytbf_t*ptr,int size) //獲取token
{if(size <= 0)return -EINVAL; //參數非法struct mytbf_st*me = ptr;while(me->token <= 0 ) //token為空就等待pause();// int n = min(me->token,size);int n = (me->token>size?size:me->token);me->token -= n;return n;
}int mytbf_returntoken(mytbf_t*ptr,int size) //返還token
{if(size<=0)return -EINVAL;struct mytbf_st*me = ptr;me->token+= size;if(me->token > me->burst)me->token = me->burst;return size;
}int mytbf_destroy(mytbf_t*ptr)
{struct mytbf_st *me;me = ptr;job[me->pos] = NULL;free(ptr);return 0;}
makefile
all:mytbf
CFLAGS=-g -Wall
mytbf:main.o mytbf.ogcc $^ $(CFLAGS) -o $@ clean:rm -rf *.o mytbf