日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Memcached 源码分析——从 main 函数说起

發布時間:2023/12/31 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Memcached 源码分析——从 main 函数说起 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

(廣告時間: 最近在寫一個基于 Leveldb 存儲引擎的數據服務器,C開發,使用 Libevent 處理網絡事件,后臺利用多線程并發處理客戶端連接,理論上單機就應該支持數千-上萬的客戶端連接(未測試),框架已基本成型,暫取名LLDB(Libevent-based and Leveldb-backended DataBase),等代碼成熟以后將它開源,希望能有同學試用。)

Memcached? 是以 LiveJournal 旗下 Danga Interactive 公司的 Brad Fitzpatric? 為首開發的一款分布式緩存服務器,基于內存,性能非常高,現在已成為mixi、hatena、Facebook、Vox、LiveJournal等眾多服務中提高Web應用擴展性的重要因素(更多介紹參見:維基百科,百科百科)。下面粗略地分析一下 Memcached 的啟動流程(基于 memcached-1.4.14),此處只列出了代碼的梗概。

int main (int argc, char **argv) {int c;bool lock_memory = false;bool do_daemonize = false;bool preallocate = false;int maxcore = 0;char *username = NULL;char *pid_file = NULL;struct passwd *pw;struct rlimit rlim;char unit = '\0';int size_max = 0;int retval = EXIT_SUCCESS;/* listening sockets */static int *l_socket = NULL;/* 更多的參數設置 *//* 有效性檢查 */if (!sanitycheck()) {return EX_OSERR;}/* 注冊信號處理函數*/signal(SIGINT, sig_handler);/* 數據庫配置初始化 */settings_init();/* 處理輸入參數,并初始化 memcached 配置,代碼略 *//* 如果指定了 -S 參數,則初始化 sasl 模塊 */if (settings.sasl) {init_sasl();}/* 是否以守護進程方式運行 memcached*//* if we want to ensure our ability to dump core, don't chdir to / */if (do_daemonize) {if (sigignore(SIGHUP) == -1) {perror("Failed to ignore SIGHUP");}if (daemonize(maxcore, settings.verbose) == -1) {fprintf(stderr, "failed to daemon() in order to daemonize\n");exit(EXIT_FAILURE);}}/* 初始化 libevent 主線程實例 */main_base = event_init();/* 其他模塊初始化 */stats_init();assoc_init(settings.hashpower_init);conn_init();slabs_init(settings.maxbytes, settings.factor, preallocate);/** 忽視 SIGPIPE 信號,如果我們需要 SIGPIPE 信號,可以檢測條件 errno == EPIPE*/if (sigignore(SIGPIPE) == -1) {perror("failed to ignore SIGPIPE; sigaction");exit(EX_OSERR);}/* 如果以多線程模式運行 memcached,則啟動工作者線程 */thread_init(settings.num_threads, main_base);/* 啟動 assoc 維護線程*/if (start_assoc_maintenance_thread() == -1) {exit(EXIT_FAILURE);}/* 啟動 slab 維護線程 */if (settings.slab_reassign &&start_slab_maintenance_thread() == -1) {exit(EXIT_FAILURE);}/* 初始化時鐘處理函數 */clock_handler(0, 0, 0);/* 釋放特權后創建 unix 模式套接字 */if (settings.socketpath != NULL) {errno = 0;if (server_socket_unix(settings.socketpath,settings.access)) {vperror("failed to listen on UNIX socket: %s", settings.socketpath);exit(EX_OSERR);}}/* 創建監聽套接字,綁定該套接字,然后進行相關初始化 */if (settings.socketpath == NULL) {const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");char temp_portnumber_filename[PATH_MAX];FILE *portnumber_file = NULL;if (portnumber_filename != NULL) {snprintf(temp_portnumber_filename,sizeof(temp_portnumber_filename),"%s.lck", portnumber_filename);portnumber_file = fopen(temp_portnumber_filename, "a");if (portnumber_file == NULL) {fprintf(stderr, "Failed to open \"%s\": %s\n",temp_portnumber_filename, strerror(errno));}}errno = 0;if (settings.port && server_sockets(settings.port, tcp_transport,portnumber_file)) {vperror("failed to listen on TCP port %d", settings.port);exit(EX_OSERR);}/** 初始化順序:首先創建監聽套接字(低端口的套接字可能需要root權限),* 然后釋放 root 權限,如果設置以守護進程運行 memcached,則 Daemonise it。* 然后初始化 libevent 庫。*//* 創建 UDP 監聽套接字,并綁定該套接字 */errno = 0;if (settings.udpport && server_sockets(settings.udpport, udp_transport,portnumber_file)) {vperror("failed to listen on UDP port %d", settings.udpport);exit(EX_OSERR);}if (portnumber_file) {fclose(portnumber_file);rename(temp_portnumber_filename, portnumber_filename);}}if (pid_file != NULL) {save_pid(pid_file);}/* 釋放特權 */drop_privileges();/* 進入事件循環 */if (event_base_loop(main_base, 0) != 0) {retval = EXIT_FAILURE;}stop_assoc_maintenance_thread();/* 如果不是守護進程,則刪除 PID 文件 */if (do_daemonize)remove_pidfile(pid_file);/* Clean up strdup() call for bind() address */if (settings.inter)free(settings.inter);if (l_socket)free(l_socket);if (u_socket)free(u_socket);return retval; }

main 函數中值得注意的幾個函數調用如下:

  • conn_init();
  • thread_init(settings.num_threads, main_base);
  • clock_handler(0, 0, 0);
  • server_socket_unix(settings.socketpath,settings.access)
  • server_sockets(settings.port, tcp_transport, portnumber_file);
  • event_base_loop(main_base, 0);

在分析上面幾個函數之前我們來看看一些重要的變量和結構體的定義:

  • 重要變量聲明
static conn *listen_conn = NULL; static struct event_base *main_base;static conn **freeconns;
  • struct conn 結構體定義:
struct conn {int sfd;sasl_conn_t *sasl_conn;enum conn_states state;enum bin_substates substate;struct event event;short ev_flags;short which; /** which events were just triggered */char *rbuf; /** buffer to read commands into */char *rcurr; /** but if we parsed some already, this is where we stopped */int rsize; /** total allocated size of rbuf */int rbytes; /** how much data, starting from rcur, do we have unparsed */char *wbuf;char *wcurr;int wsize;int wbytes;/** which state to go into after finishing current write */enum conn_states write_and_go;void *write_and_free; /** free this memory after finishing writing */char *ritem; /** when we read in an item's value, it goes here */int rlbytes;/* data for the nread state *//*** item is used to hold an item structure created after reading the command* line of set/add/replace commands, but before we finished reading the actual* data. The data is read into ITEM_data(item) to avoid extra copying.*/void *item; /* for commands set/add/replace *//* data for the swallow state */int sbytes; /* how many bytes to swallow *//* data for the mwrite state */struct iovec *iov;int iovsize; /* number of elements allocated in iov[] */int iovused; /* number of elements used in iov[] */struct msghdr *msglist;int msgsize; /* number of elements allocated in msglist[] */int msgused; /* number of elements used in msglist[] */int msgcurr; /* element in msglist[] being transmitted now */int msgbytes; /* number of bytes in current msg */item **ilist; /* list of items to write out */int isize;item **icurr;int ileft;char **suffixlist;int suffixsize;char **suffixcurr;int suffixleft;enum protocol protocol; /* which protocol this connection speaks */enum network_transport transport; /* what transport is used by this connection *//* data for UDP clients */int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */struct sockaddr request_addr; /* Who sent the most recent request */socklen_t request_addr_size;unsigned char *hdrbuf; /* udp packet headers */int hdrsize; /* number of headers' worth of space is allocated */bool noreply; /* True if the reply should not be sent. *//* current stats command */struct {char *buffer;size_t size;size_t offset;} stats;/* Binary protocol stuff *//* This is where the binary header goes */protocol_binary_request_header binary_header;uint64_t cas; /* the cas to return */short cmd; /* current command being processed */int opaque;int keylen;conn *next; /* Used for generating a list of conn structures */LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */ };
  • LIBEVENT_THREAD 和 LIBEVENT_DISPATCHER_THREAD定義:
typedef struct {pthread_t thread_id; /* unique ID of this thread */struct event_base *base; /* libevent handle this thread uses */struct event notify_event; /* listen event for notify pipe */int notify_receive_fd; /* receiving end of notify pipe */int notify_send_fd; /* sending end of notify pipe */struct thread_stats stats; /* Stats generated by this thread */struct conn_queue *new_conn_queue; /* queue of new connections to handle */cache_t *suffix_cache; /* suffix cache */ } LIBEVENT_THREAD;typedef struct {pthread_t thread_id; /* unique ID of this thread */struct event_base *base; /* libevent handle this thread uses */ } LIBEVENT_DISPATCHER_THREAD;

下面分析conn_init(); 函數:

static void conn_init(void) {freetotal = 200;freecurr = 0;if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {fprintf(stderr, "Failed to allocate connection structures\n");}return; }

基本上就是分配 freetotal 個 conn * 空間,非常簡單,

接下來是另外一個重要的函數調用:thread_init();

/** 初始化線程子模塊,創建各種 worker 線程。** nthreads 代表 worker 事件處理線程的數目* main_base 是主線程的event base。*/ void thread_init(int nthreads, struct event_base *main_base) {int i;int power;
/* 初始化鎖 */pthread_mutex_init(&cache_lock, NULL);pthread_mutex_init(&stats_lock, NULL);pthread_mutex_init(&init_lock, NULL);pthread_cond_init(&init_cond, NULL);pthread_mutex_init(&cqi_freelist_lock, NULL);cqi_freelist = NULL;/* Want a wide lock table, but don't waste memory */if (nthreads < 3) {power = 10;} else if (nthreads < 4) {power = 11;} else if (nthreads < 5) {power = 12;} else {/* 8192 buckets, and central locks don't scale much past 5 threads */power = 13;}item_lock_count = ((unsigned long int)1 << (power));item_lock_mask = item_lock_count - 1;item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));if (! item_locks) {perror("Can't allocate item locks");exit(1);}for (i = 0; i < item_lock_count; i++) {pthread_mutex_init(&item_locks[i], NULL);}threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));if (! threads) {perror("Can't allocate thread descriptors");exit(1);}/* 設置 dispatcher_thread (即主線程)的相關結構 */dispatcher_thread.base = main_base;dispatcher_thread.thread_id = pthread_self();for (i = 0; i < nthreads; i++) {int fds[2];if (pipe(fds)) {perror("Can't create notify pipe");exit(1);}/* 此處用了一個 trick,worker 線程通過讀取 notify_receice_fd * 一個字節獲知主線程接受到了事件。 */threads[i].notify_receive_fd = fds[0];threads[i].notify_send_fd = fds[1];setup_thread(&threads[i]);/* 為 libevent 保留三個 fd,另外兩個預留給管道 */stats.reserved_fds += 5;}/* 完成了所有的 libevent 設置后創建 worker 線程 */for (i = 0; i < nthreads; i++) {create_worker(worker_libevent, &threads[i]);}/* 主線程等待所有的線程設置好了以后在返回 */pthread_mutex_lock(&init_lock);while (init_count < nthreads) {pthread_cond_wait(&init_cond, &init_lock);}pthread_mutex_unlock(&init_lock); } static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;/** 每個 Libevent 實例都有一對喚醒的管道,其他線程可以想管道中寫入數據* 來告知他在隊列中放入了一個新的連接*/ static LIBEVENT_THREAD *threads;

thread_init() 中又調用了 setup_thread() 來設置每個 worker 線程的信息。

static void setup_thread(LIBEVENT_THREAD *me) {me->base = event_init();if (! me->base) {fprintf(stderr, "Can't allocate event base\n");exit(1);}/* Listen for notifications from other threads */event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);event_base_set(me->base, &me->notify_event);if (event_add(&me->notify_event, 0) == -1) {fprintf(stderr, "Can't monitor libevent notify pipe\n");exit(1);}me->new_conn_queue = malloc(sizeof(struct conn_queue));if (me->new_conn_queue == NULL) {perror("Failed to allocate memory for connection queue");exit(EXIT_FAILURE);}cq_init(me->new_conn_queue);if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {perror("Failed to initialize mutex");exit(EXIT_FAILURE);}me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),NULL, NULL);if (me->suffix_cache == NULL) {fprintf(stderr, "Failed to create suffix cache\n");exit(EXIT_FAILURE);} }

并在setup_thread() 中設置 worker 線程的回調函數,thread_libevent_process() :

/** 當每個 worker 線程的喚醒管道(wakeup pipe)收到有連接到來的通知時,* 就調用該函數。*/ static void thread_libevent_process(int fd, short which, void *arg) {LIBEVENT_THREAD *me = arg;CQ_ITEM *item;char buf[1];if (read(fd, buf, 1) != 1)if (settings.verbose > 0)fprintf(stderr, "Can't read from libevent pipe\n");item = cq_pop(me->new_conn_queue);if (NULL != item) {conn *c = conn_new(item->sfd, item->init_state, item->event_flags,item->read_buffer_size, item->transport, me->base);if (c == NULL) {if (IS_UDP(item->transport)) {fprintf(stderr, "Can't listen for events on UDP socket\n");exit(1);} else {if (settings.verbose > 0) {fprintf(stderr, "Can't listen for events on fd %d\n",item->sfd);}close(item->sfd);}} else {c->thread = me;}cqi_free(item);} }

thread_init() 中還調用了create_worker() 函數創建 worker 線程,同時設置worker 線程的回調函數為 worker_libevent():

/** Worker 線程: 事件循環*/ static void *worker_libevent(void *arg) {LIBEVENT_THREAD *me = arg;/* thread_init() 會一直阻塞到所有的線程完成初始化*/pthread_mutex_lock(&init_lock);init_count++;pthread_cond_signal(&init_cond);pthread_mutex_unlock(&init_lock);/* worker 線程進入事件循環 */event_base_loop(me->base, 0);return NULL; }

至此 thread_init() 函數返回。

接下來一個比較重要的調用是server_sockets(),server_sockets() 中又調用了 server_socket(),然后在在 server_socket() 中又調用了 conn_new(),并在 conn_new()中設置事件的回調函數 event_handler(),

void event_handler(const int fd, const short which, void *arg) {conn *c;c = (conn *)arg;assert(c != NULL);c->which = which;/* sanity */if (fd != c->sfd) {if (settings.verbose > 0)fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");conn_close(c);return;}drive_machine(c);/* wait for next event */return; }

drive_machine() 函數可以說是一個大的狀態機,函數很長,

static void drive_machine(conn *c) {bool stop = false;int sfd, flags = 1;socklen_t addrlen;struct sockaddr_storage addr;int nreqs = settings.reqs_per_event;int res;const char *str;assert(c != NULL);while (!stop) {switch(c->state) {case conn_listening:addrlen = sizeof(addr);if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {/* these are transient, so don't log anything */stop = true;} else if (errno == EMFILE) {if (settings.verbose > 0)fprintf(stderr, "Too many open connections\n");accept_new_conns(false);stop = true;} else {perror("accept()");stop = true;}break;}if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {perror("setting O_NONBLOCK");close(sfd);break;}if (settings.maxconns_fast &&stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {str = "ERROR Too many open connections\r\n";res = write(sfd, str, strlen(str));close(sfd);STATS_LOCK();stats.rejected_conns++;STATS_UNLOCK();} else {dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,DATA_BUFFER_SIZE, tcp_transport);}stop = true;break;case conn_waiting:if (!update_event(c, EV_READ | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);break;}conn_set_state(c, conn_read);stop = true;break;case conn_read:res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);switch (res) {case READ_NO_DATA_RECEIVED:conn_set_state(c, conn_waiting);break;case READ_DATA_RECEIVED:conn_set_state(c, conn_parse_cmd);break;case READ_ERROR:conn_set_state(c, conn_closing);break;case READ_MEMORY_ERROR: /* Failed to allocate more memory *//* State already set by try_read_network */break;}break;case conn_parse_cmd :if (try_read_command(c) == 0) {/* wee need more data! */conn_set_state(c, conn_waiting);}break;case conn_new_cmd:/* Only process nreqs at a time to avoid starving otherconnections */--nreqs;if (nreqs >= 0) {reset_cmd_handler(c);} else {pthread_mutex_lock(&c->thread->stats.mutex);c->thread->stats.conn_yields++;pthread_mutex_unlock(&c->thread->stats.mutex);if (c->rbytes > 0) {/* We have already read in data into the input buffer,so libevent will most likely not signal read eventson the socket (unless more data is available. As ahack we should just put in a request to write data,because that should be possible ;-)*/if (!update_event(c, EV_WRITE | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);}}stop = true;}break;case conn_nread:if (c->rlbytes == 0) {complete_nread(c);break;}/* first check if we have leftovers in the conn_read buffer */if (c->rbytes > 0) {int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;if (c->ritem != c->rcurr) {memmove(c->ritem, c->rcurr, tocopy);}c->ritem += tocopy;c->rlbytes -= tocopy;c->rcurr += tocopy;c->rbytes -= tocopy;if (c->rlbytes == 0) {break;}}/* now try reading from the socket */res = read(c->sfd, c->ritem, c->rlbytes);if (res > 0) {pthread_mutex_lock(&c->thread->stats.mutex);c->thread->stats.bytes_read += res;pthread_mutex_unlock(&c->thread->stats.mutex);if (c->rcurr == c->ritem) {c->rcurr += res;}c->ritem += res;c->rlbytes -= res;break;}if (res == 0) { /* end of stream */conn_set_state(c, conn_closing);break;}if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {if (!update_event(c, EV_READ | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);break;}stop = true;break;}/* otherwise we have a real error, on which we close the connection */if (settings.verbose > 0) {fprintf(stderr, "Failed to read, and not due to blocking:\n""errno: %d %s \n""rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",errno, strerror(errno),(long)c->rcurr, (long)c->ritem, (long)c->rbuf,(int)c->rlbytes, (int)c->rsize);}conn_set_state(c, conn_closing);break;case conn_swallow:/* we are reading sbytes and throwing them away */if (c->sbytes == 0) {conn_set_state(c, conn_new_cmd);break;}/* first check if we have leftovers in the conn_read buffer */if (c->rbytes > 0) {int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;c->sbytes -= tocopy;c->rcurr += tocopy;c->rbytes -= tocopy;break;}/* now try reading from the socket */res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);if (res > 0) {pthread_mutex_lock(&c->thread->stats.mutex);c->thread->stats.bytes_read += res;pthread_mutex_unlock(&c->thread->stats.mutex);c->sbytes -= res;break;}if (res == 0) { /* end of stream */conn_set_state(c, conn_closing);break;}if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {if (!update_event(c, EV_READ | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);break;}stop = true;break;}/* otherwise we have a real error, on which we close the connection */if (settings.verbose > 0)fprintf(stderr, "Failed to read, and not due to blocking\n");conn_set_state(c, conn_closing);break;case conn_write:/** We want to write out a simple response. If we haven't already,* assemble it into a msgbuf list (this will be a single-entry* list for TCP or a two-entry list for UDP).*/if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {if (add_iov(c, c->wcurr, c->wbytes) != 0) {if (settings.verbose > 0)fprintf(stderr, "Couldn't build response\n");conn_set_state(c, conn_closing);break;}}/* fall through... */case conn_mwrite:if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {if (settings.verbose > 0)fprintf(stderr, "Failed to build UDP headers\n");conn_set_state(c, conn_closing);break;}switch (transmit(c)) {case TRANSMIT_COMPLETE:if (c->state == conn_mwrite) {while (c->ileft > 0) {item *it = *(c->icurr);assert((it->it_flags & ITEM_SLABBED) == 0);item_remove(it);c->icurr++;c->ileft--;}while (c->suffixleft > 0) {char *suffix = *(c->suffixcurr);cache_free(c->thread->suffix_cache, suffix);c->suffixcurr++;c->suffixleft--;}/* XXX: I don't know why this wasn't the general case */if(c->protocol == binary_prot) {conn_set_state(c, c->write_and_go);} else {conn_set_state(c, conn_new_cmd);}} else if (c->state == conn_write) {if (c->write_and_free) {free(c->write_and_free);c->write_and_free = 0;}conn_set_state(c, c->write_and_go);} else {if (settings.verbose > 0)fprintf(stderr, "Unexpected state %d\n", c->state);conn_set_state(c, conn_closing);}break;case TRANSMIT_INCOMPLETE:case TRANSMIT_HARD_ERROR:break; /* Continue in state machine. */case TRANSMIT_SOFT_ERROR:stop = true;break;}break;case conn_closing:if (IS_UDP(c->transport))conn_cleanup(c);elseconn_close(c);stop = true;break;case conn_max_state:assert(false);break;}}return; }

可以說整個 memcached 就是圍繞這個狀態機運行的,可能的狀態如下:

enum conn_states {conn_listening, /**< 套接字監聽端口,等待新的連接 */conn_new_cmd, /**< 準備下一次命令的連接 */conn_waiting, /**< 等待可讀套接字 */conn_read, /**< 讀入命令行 */conn_parse_cmd, /**< 從輸入緩沖區中分析命令 */conn_write, /**< 響應寫出 */conn_nread, /**< 讀入固定大小的字節 */conn_swallow, /**< 去除不必要的存儲字節 */conn_closing, /**< 關閉連接 */conn_mwrite, /**< 順序寫 item */conn_max_state /**< 最大的狀態值,用于狀態Assertion(斷言) */ };

在 conn_listening 狀態時,接受新的客戶端連接,然后調用dispatch_new_connection():

/** 分發新的連接至其他線程,該函數只會在主線程中調用,* 調用時機為:主線程初始化(UDP模式)或者* 存在新的連接到來*/ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,int read_buffer_size, enum network_transport transport) {CQ_ITEM *item = cqi_new();int tid = (last_thread + 1) % settings.num_threads;LIBEVENT_THREAD *thread = threads + tid;last_thread = tid;item->sfd = sfd;item->init_state = init_state;item->event_flags = event_flags;item->read_buffer_size = read_buffer_size;item->transport = transport;cq_push(thread->new_conn_queue, item);MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);if (write(thread->notify_send_fd, "", 1) != 1) {perror("Writing to thread notify pipe");} }

至此,主線程和 worker 線程大部分邏輯均已介紹完畢,并各自進入自己的事件循環處理相應的業務。

讀后語:memcached 代碼簡介易讀,基于 Libevent 處理網絡事件,并采用了多線程機制,大大利用了多核的計算能力,提高了系統接受客戶端請求并發數量。

同時 memcached 在主線程和 worker 線程之間關于新連接到來的通知的處理也比較有趣,主線程和 worker 線程之間使用了一對管道來通信,每當主線程接受到新的連接時,它就向管道的一段寫入一個字節的數據,然后由于 worker 線程監聽了管道另外一端的事件,所以 worker 線程可以感知到新的連接到了,然后該連接被主線程 Dispatch 到某一個線程的隊列中,再由 worker 線程進行處理。

?

總結

以上是生活随笔為你收集整理的Memcached 源码分析——从 main 函数说起的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 久久精品免费一区二区 | 黄色网在线免费观看 | 91午夜在线 | 国产精品视频你懂的 | 中文国语毛片高清视频 | 国产成人无码aa精品一区 | 欧美国产激情 | 天堂在线资源8 | 亚洲色图视频网站 | 五月天婷婷视频 | mm131在线 | 国产aa大片 | 美女网站一区 | 女教师三上悠亚ssni-152 | av在线毛片 | 黄色永久网站 | 日韩视频免费观看高清完整版 | 久久福利电影 | 国产1区2区3区中文字幕 | 极度诱惑香港电影完整 | 91精品国产色综合久久不卡98 | 成人一区二区三区视频 | 亚洲精品日韩综合观看成人91 | 娇妻玩4p被三个男人伺候电影 | 狠狠网站 | 激情九月天 | 日韩视频免费观看高清完整版在线观看 | 欧美一级一区二区 | 性色浪潮av | 色天天天| av免费天堂| 亚洲羞羞| 成人免费黄色片 | 亚洲一区二区观看 | 在线看免费毛片 | 友田真希一区二区 | 亚洲第一香蕉网 | 98视频在线| 木下凛凛子av一区二区三区 | 国产精品女人精品久久久天天 | 4438x全国最大成人网 | 性欧美xxxx | 美日韩一级 | 久插视频 | aa毛片视频 | 久久咪咪 | 毛片a片免费观看 | 97干视频 | 视频精品久久 | 日本少妇性高潮 | 少妇精品久久久久www | 亚洲精品在线观看网站 | 精品女厕偷拍一区二区 | 五十路黄色片 | 久久人人爱 | 福利午夜视频 | 黄色网炮 | 丰满少妇aaaaaa爰片毛片 | 日韩婷婷 | 爽爽影院免费观看 | 99精品一区二区三区无码吞精 | 免费一级片网站 | 欧美在线小视频 | 一本一道久久 | 国产精品久久999 | 爆操老女人 | 日本三级视频在线播放 | 国产999精品久久久久久 | 涩涩涩涩涩涩涩涩涩 | 中文字幕国产一区二区 | 中文字幕资源网 | 午夜福利电影一区 | 国产在线观看免费av | 天堂а√在线中文在线鲁大师 | 澳门黄色 | 日本黄色片段 | 亚洲一线av| 海量av资源 | 91美女视频网站 | 日韩久久影视 | 肉大捧一进一出免费视频 | 黄色成人在线 | 亚洲精品专区 | 男人的天堂中文字幕 | 北条麻妃久久精品 | 色综合网站 | 捆绑无遮挡打光屁股调教女仆 | 亚洲狼人av| 伊人狼人综合 | 精品伊人| 国产成人精品一区二 | 毛片毛片毛片毛片毛片毛片毛片毛片 | 免费的毛片网站 | 毛茸茸成熟亚洲人 | 亚洲中文一区二区 | 中国女人内精69xxxxxx | 4438亚洲最大 | 神马午夜不卡 | 美女网站免费视频 |