首先,在4.0.11当中,bio留给外部调用的接口如下
点击(此处)折叠或打开
- /* Exported API */
- void bioInit(void);
- void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);
- unsigned long long bioPendingJobsOfType(int type);
- unsigned long long bioWaitStepOfType(int type);
- time_t bioOlderJobOfType(int type);
- void bioKillThreads(void);
点击(此处)折叠或打开
- /* Background job opcodes */
- #define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
- #define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
- #define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
- #define BIO_NUM_OPS 3
为了分开执行以上三种情况,redis为每一种情况都构建了如下变量
点击(此处)折叠或打开
- static pthread_t bio_threads[BIO_NUM_OPS];
- static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
- static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
- static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
- static list *bio_jobs[BIO_NUM_OPS];
- static unsigned long long bio_pending[BIO_NUM_OPS];
后台任务是用如下结构来表示的
点击(此处)折叠或打开
- /* This structure represents a background Job. It is only used locally to this
- * file as the API does not expose the internals at all. */
- struct bio_job {
- time_t time; /* Time at which the job was created. */
- /* Job specific arguments pointers. If we need to pass more than three
- * arguments we can just pass a pointer to a structure or alike. */
- void *arg1, *arg2, *arg3;
- };
点击(此处)折叠或打开
- /* Initialize the background system, spawning the thread. */
- void bioInit(void) {
- pthread_attr_t attr;
- pthread_t thread;
- size_t stacksize;
- int j;
- /* Initialization of state vars and objects */
- for (j = 0; j < BIO_NUM_OPS; j++) {
- pthread_mutex_init(&bio_mutex[j],NULL);
- pthread_cond_init(&bio_newjob_cond[j],NULL);
- pthread_cond_init(&bio_step_cond[j],NULL);
- bio_jobs[j] = listCreate();
- bio_pending[j] = 0;
- }
- /* Set the stack size as by default it may be small in some system */
- pthread_attr_init(&attr);
- pthread_attr_getstacksize(&attr,&stacksize);
- if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
- while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
- pthread_attr_setstacksize(&attr, stacksize);
- /* Ready to spawn our threads. We use the single argument the thread
- * function accepts in order to pass the job ID the thread is
- * responsible of. */
- for (j = 0; j < BIO_NUM_OPS; j++) {
- void *arg = (void*)(unsigned long) j;
- if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
- serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
- exit(1);
- }
- bio_threads[j] = thread;
- }
- }
点击(此处)折叠或打开
- void *bioProcessBackgroundJobs(void *arg) {
- struct bio_job *job;
- unsigned long type = (unsigned long) arg;
- sigset_t sigset;
- /* Check that the type is within the right interval. */
- if (type >= BIO_NUM_OPS) {
- serverLog(LL_WARNING,
- "Warning: bio thread started with wrong type %lu",type);
- return NULL;
- }
- /* Make the thread killable at any time, so that bioKillThreads()
- * can work reliably. */
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
- pthread_mutex_lock(&bio_mutex[type]);
- /* Block SIGALRM so we are sure that only the main thread will
- * receive the watchdog signal. */
- sigemptyset(&sigset);
- sigaddset(&sigset, SIGALRM);
- if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
- serverLog(LL_WARNING,
- "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
- while(1) {
- listNode *ln;
- /* The loop always starts with the lock hold. */
- if (listLength(bio_jobs[type]) == 0) {
- pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
- continue;
- }
- /* Pop the job from the queue. */
- ln = listFirst(bio_jobs[type]);
- job = ln->value;
- /* It is now possible to unlock the background system as we know have
- * a stand alone job structure to process.*/
- pthread_mutex_unlock(&bio_mutex[type]);
- /* Process the job accordingly to its type. */
- if (type == BIO_CLOSE_FILE) {
- close((long)job->arg1);
- } else if (type == BIO_AOF_FSYNC) {
- aof_fsync((long)job->arg1);
- } else if (type == BIO_LAZY_FREE) {
- /* What we free changes depending on what arguments are set:
- * arg1 -> free the object at pointer.
- * arg2 & arg3 -> free two dictionaries (a Redis DB).
- * only arg3 -> free the skiplist. */
- if (job->arg1)
- lazyfreeFreeObjectFromBioThread(job->arg1);
- else if (job->arg2 && job->arg3)
- lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
- else if (job->arg3)
- lazyfreeFreeSlotsMapFromBioThread(job->arg3);
- } else {
- serverPanic("Wrong job type in bioProcessBackgroundJobs().");
- }
- zfree(job);
- /* Unblock threads blocked on bioWaitStepOfType() if any. */
- pthread_cond_broadcast(&bio_step_cond[type]);
- /* Lock again before reiterating the loop, if there are no longer
- * jobs to process we'll block again in pthread_cond_wait(). */
- pthread_mutex_lock(&bio_mutex[type]);
- listDelNode(bio_jobs[type],ln);
- bio_pending[type]--;
- }
- }
点击(此处)折叠或打开
- void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
- struct bio_job *job = zmalloc(sizeof(*job));
- job->time = time(NULL);
- job->arg1 = arg1;
- job->arg2 = arg2;
- job->arg3 = arg3;
- pthread_mutex_lock(&bio_mutex[type]);
- listAddNodeTail(bio_jobs[type],job);
- bio_pending[type]++;
- pthread_cond_signal(&bio_newjob_cond[type]);
- pthread_mutex_unlock(&bio_mutex[type]);
- }
点击(此处)折叠或打开
- /* Return the number of pending jobs of the specified type. */
- unsigned long long bioPendingJobsOfType(int type) {
- unsigned long long val;
- pthread_mutex_lock(&bio_mutex[type]);
- val = bio_pending[type];
- pthread_mutex_unlock(&bio_mutex[type]);
- return val;
- }
点击(此处)折叠或打开
- unsigned long long bioWaitStepOfType(int type) {
- unsigned long long val;
- pthread_mutex_lock(&bio_mutex[type]);
- val = bio_pending[type];
- if (val != 0) {
- pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]);
- val = bio_pending[type];
- }
- pthread_mutex_unlock(&bio_mutex[type]);
- return val;
- }
点击(此处)折叠或打开
- void bioKillThreads(void) {
- int err, j;
- for (j = 0; j < BIO_NUM_OPS; j++) {
- if (pthread_cancel(bio_threads[j]) == 0) {
- if ((err = pthread_join(bio_threads[j],NULL)) != 0) {
- serverLog(LL_WARNING,
- "Bio thread for job type #%d can be joined: %s",
- j, strerror(err));
- } else {
- serverLog(LL_WARNING,
- "Bio thread for job type #%d terminated",j);
- }
- }
- }
- }