每一个大型的项目,都会必须要设计log,log是重要的调试手段,也是很好的学习入口。跟踪log可以让一个新手快速的理解代码,分析log可以帮助工程师很好的定位问题。
下面通过跟踪ceph-mon这个可执行文件,了解ceph中的log实现。
ceph_mon 初始化中 会调用global_init, global_init一开始就会调用 common_preinit函数
下面通过跟踪ceph-mon这个可执行文件,了解ceph中的log实现。
ceph_mon 初始化中 会调用global_init, global_init一开始就会调用 common_preinit函数
创建一个重要的数据结构CephContext cct。
- CephContext::CephContext(uint32_t module_type_)
- : nref(1),
- _conf(new md_config_t()),
- _log(NULL),
- _module_type(module_type_),
- _service_thread(NULL),
- _log_obs(NULL),
- _admin_socket(NULL),
- _perf_counters_collection(NULL),
- _perf_counters_conf_obs(NULL),
- _heartbeat_map(NULL),
- _crypto_none(NULL),
- _crypto_aes(NULL)
- {
- pthread_spin_init(&_service_thread_lock, PTHREAD_PROCESS_SHARED);
-
- _log = new ceph::log::Log(&_conf->subsys);
- _log->start();
-
- _log_obs = new LogObs(_log);
- _conf->add_observer(_log_obs);
-
- _perf_counters_collection = new PerfCountersCollection(this);
- _admin_socket = new AdminSocket(this);
- _heartbeat_map = new HeartbeatMap(this);
-
- _admin_hook = new CephContextHook(this);
- _admin_socket->register_command("perfcounters_dump", "perfcounters_dump", _admin_hook, "");
- _admin_socket->register_command("1", "1", _admin_hook, "");
- _admin_socket->register_command("perf dump", "perf dump", _admin_hook, "dump perfcounters value");
- _admin_socket->register_command("perfcounters_schema", "perfcounters_schema", _admin_hook, "");
- _admin_socket->register_command("2", "2", _admin_hook, "");
- _admin_socket->register_command("perf schema", "perf schema", _admin_hook, "dump perfcounters schema");
- _admin_socket->register_command("config show", "config show", _admin_hook, "dump current config settings");
- _admin_socket->register_command("config set", "config set name=var,type=CephString name=val,type=CephString,n=N", _admin_hook, "config set [ ...]: set a config variable");
- _admin_socket->register_command("config get", "config get name=var,type=CephString", _admin_hook, "config get : get the config value");
- _admin_socket->register_command("log flush", "log flush", _admin_hook, "flush log entries to log file");
- _admin_socket->register_command("log dump", "log dump", _admin_hook, "dump recent log entries to log file");
- _admin_socket->register_command("log reopen", "log reopen", _admin_hook, "reopen log file");
-
- _crypto_none = new CryptoNone;
- _crypto_aes = new CryptoAES;
- }
这个函数并不长,但是绝不简单,本文就是介绍Log线程
- _log = new ceph::log::Log(&_conf->subsys);
- _log->start();
首先是创建了成员变量_log
第一句 new 是创建了Log实例:基本就是初始化
1 初始化自旋锁 m_lock
2 初始化互斥量 m_flush_mutex 和m_queue_mutex
3 初始化条件变量 m_con_logger 和m_cond_flusher
- Log::Log(SubsystemMap *s)
- : m_indirect_this(NULL),
- m_subs(s),
- m_new(), m_recent(),
- m_fd(-1),
- m_syslog_log(-2), m_syslog_crash(-2),
- m_stderr_log(1), m_stderr_crash(-1),
- m_stop(false),
- m_max_new(DEFAULT_MAX_NEW),
- m_max_recent(DEFAULT_MAX_RECENT)
- {
- int ret;
-
- ret = pthread_spin_init(&m_lock, PTHREAD_PROCESS_SHARED);
- assert(ret == 0);
-
- ret = pthread_mutex_init(&m_flush_mutex, NULL);
- assert(ret == 0);
-
- ret = pthread_mutex_init(&m_queue_mutex, NULL);
- assert(ret == 0);
-
- ret = pthread_cond_init(&m_cond_loggers, NULL);
- assert(ret == 0);
-
- ret = pthread_cond_init(&m_cond_flusher, NULL);
- assert(ret == 0);
-
- // kludge for prealloc testing
- if (false)
- for (int i=0; i < PREALLOC; i++)
- m_recent.enqueue(new Entry);
- }
接下来启动了log线程:
- void Log::start()
- {
- assert(!is_started());
- pthread_mutex_lock(&m_queue_mutex);
- m_stop = false;
- pthread_mutex_unlock(&m_queue_mutex);
- create();
- }
关键一句是create,这个create从哪里冒出来的? 这个调用的是 Thread里面的create 方法:
- int Thread::try_create(size_t stacksize)
- {
- pthread_attr_t *thread_attr = NULL;
- stacksize &= CEPH_PAGE_MASK; // must be multiple of page
- if (stacksize) {
- thread_attr = (pthread_attr_t*) malloc(sizeof(pthread_attr_t));
- if (!thread_attr)
- return -ENOMEM;
- pthread_attr_init(thread_attr);
- pthread_attr_setstacksize(thread_attr, stacksize);
- }
- int r;
- // The child thread will inherit our signal mask. Set our signal mask to
- // the set of signals we want to block. (It's ok to block signals more
- // signals than usual for a little while-- they will just be delivered to
- // another thread or delieverd to this thread later.)
- sigset_t old_sigset;
- if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
- block_signals(NULL, &old_sigset);
- }
- else {
- int to_block[] = { SIGPIPE , 0 };
- block_signals(to_block, &old_sigset);
- }
- r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
- restore_sigset(&old_sigset);
- if (thread_attr)
- free(thread_attr);
- return r;
- }
- void Thread::create(size_t stacksize)
- {
- int ret = try_create(stacksize);
- if (ret != 0) {
- char buf[256];
- snprintf(buf, sizeof(buf), "Thread::try_create(): pthread_create "
- "failed with error %d", ret);
- dout_emergency(buf);
- assert(ret == 0);
- }
- }
创建线程,该方法做了下面的事情:
1 可以设置线程栈的size
2 对于daemon进程,会创建线程前会阻塞SIGPIPE
3 调用pthread_create函数创建线程
等一下,线程执行什么函数? _entry_func到底是什么?
也就是说线程执行的函数,记录在pthread_create的第四个变量的里面:
即Log的类中,有成员函数 entry,即线程应该执行的函数:
- void *Thread::_entry_func(void *arg) {
- void *r = ((Thread*)arg)->entry();
- return r;
- }
- namespace ceph {
- namespace log {
- class Log : private Thread
- {
- Log **m_indirect_this;
- SubsystemMap *m_subs;
-
- pthread_spinlock_t m_lock;
- pthread_mutex_t m_queue_mutex;
- pthread_mutex_t m_flush_mutex;
- pthread_cond_t m_cond_loggers;
- pthread_cond_t m_cond_flusher;
- EntryQueue m_new; ///< new entries
- EntryQueue m_recent; ///< recent (less new) entries we've already written at low detail
- std::string m_log_file;
- int m_fd;
- int m_syslog_log, m_syslog_crash;
- int m_stderr_log, m_stderr_crash;
- bool m_stop;
- int m_max_new, m_max_recent;
- void *entry();
- void _flush(EntryQueue *q, EntryQueue *requeue, bool crash);
- void _log_message(const char *s, bool crash);
- public:
- Log(SubsystemMap *s);
- virtual ~Log();
- void set_flush_on_exit();
- void set_max_new(int n);
- void set_max_recent(int n);
- void set_log_file(std::string fn);
- void reopen_log_file();
- void flush();
- void dump_recent();
- void set_syslog_level(int log, int crash);
- void set_stderr_level(int log, int crash);
- Entry *create_entry(int level, int subsys);
- void submit_entry(Entry *e);
- void start();
- void stop();
- };
- }
- }
- void *Log::entry()
- {
- pthread_mutex_lock(&m_queue_mutex);
- while (!m_stop) {
- if (!m_new.empty()) {
- pthread_mutex_unlock(&m_queue_mutex);
- flush();
- pthread_mutex_lock(&m_queue_mutex);
- continue;
- }
- pthread_cond_wait(&m_cond_flusher, &m_queue_mutex);
- }
- pthread_mutex_unlock(&m_queue_mutex);
- flush();
- return NULL;
- }
这就是log线程执行的动作。
1 m_stop来控制线程是否终止
2 如果m_new 这个队列不空,就调用flush,负责写入log
3 如果队列空了,条件等待,有新的log出现在队列上,会通知到这个线程
暂且不管如果控制线程,谁来通知log线程有新的log,直接看下线程的主要工作,flush主要工作如下
- void Log::flush()
- {
- pthread_mutex_lock(&m_flush_mutex);
- pthread_mutex_lock(&m_queue_mutex);
- EntryQueue t;
- t.swap(m_new);
- pthread_cond_broadcast(&m_cond_loggers);
- pthread_mutex_unlock(&m_queue_mutex);
- _flush(&t, &m_recent, false);
- // trim
- //m_recent有一个最大值,超出了最大值,就从队列中删除最老的log,内存也就释放了
- while (m_recent.m_len > m_max_recent) {
- delete m_recent.dequeue();
- }
- pthread_mutex_unlock(&m_flush_mutex);
- }
首先,创建一个临时队列,t,执行swap将m_new里面的log都接了过去。所谓接过去,不过是将指针指向队列内容的事情交给临时队列,m_new 头指针和尾指针置成NULL
做了这件事之后,m_new又变成了空的队列,好心地给其他线程发了广播之后,就可以解锁m_queue_mutex互斥量了。
swap操作比较简单,就是交换指针的彼此指向:
然后将主要的写日志的工作就委托给了__flush函数。
可以看出,有的log需要写入日志文件,有的需要写入syslog,有些日志需要写入stderr
同时,日志还有一个优先级的概念,因此会根据should_log 来控制。
接下来是common字段,日志总免不了要记录消息的发生时间。:
这个m_stamp是utime_t类型,他的sprintf实现如下:
因此log的前缀是
接下来的字段是,线程的线程ID以及 消息的优先级,所谓线程ID并不是调度意义上的ID,而是pthread_self的返回值那个ID
ceph-mon是多线程的程序:
因此,我们可以取来一条log查看下ceph log的前缀:
值得一提的是,ceph维护了一个m_recent队列,所有的消息都会存放到该队列中去,哪怕优先级比较低,不会打印到日志文件中去。
这就是_flush 函数中的 enqueue做的事情:
当然了,队列是有大小的限制,否则队列就会膨胀,导致内存耗尽。这个默认的限制是10000.
ceph提供了方法来查看最近的log,这就是log dump方法。
表面看啥也没输出,实际上将log dump到了日志文件中,比如我们例子,在log文件中出现:
很有意思的是,ceph如何做到 ceph daemon /var/run/ceph/ceph-mon.*asok log dump 就把log dump到日志文件的。 这就牵扯到admin socket机制了。
admin socket 机制并不是一个非常新的机制,这是一个很commmon的设计。
很多进程,设计的时候,需要设计一个手段,能够运行时,接收用户的指令,不能将程序设计成一个黑匣子,相反,要提供一些手段,使是用户或者可以干预进程的运行(如改变配置项),或者可以获取到进程运行的状态信息。
admin socket机制,那是下一篇的任务。
swap操作比较简单,就是交换指针的彼此指向:
- void swap(EntryQueue& other) {
- int len = m_len;
- struct Entry *h = m_head, *t = m_tail;
- m_len = other.m_len;
- m_head = other.m_head;
- m_tail = other.m_tail;
- other.m_len = len;
- other.m_head = h;
- other.m_tail = t;
- }
然后将主要的写日志的工作就委托给了__flush函数。
可以看出,有的log需要写入日志文件,有的需要写入syslog,有些日志需要写入stderr
同时,日志还有一个优先级的概念,因此会根据should_log 来控制。
- void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash)
- {
- Entry *e;
- char buf[80];
- while ((e = t->dequeue()) != NULL) {
- unsigned sub = e->m_subsys;
- bool should_log = crash || m_subs->get_log_level(sub) >= e->m_prio;
- bool do_fd = m_fd >= 0 && should_log;
- bool do_syslog = m_syslog_crash >= e->m_prio && should_log;
- bool do_stderr = m_stderr_crash >= e->m_prio && should_log;
- if (do_fd || do_syslog || do_stderr) {
- int buflen = 0;
- if (crash)
- buflen += snprintf(buf, sizeof(buf), "%6d> ", -t->m_len);
- buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);
- buflen += snprintf(buf + buflen, sizeof(buf)-buflen, " %lx %2d ",
- (unsigned long)e->m_thread, e->m_prio);
- // FIXME: this is slow
- string s = e->get_str();
- if (do_fd) {
- int r = safe_write(m_fd, buf, buflen);
- if (r >= 0)
- r = safe_write(m_fd, s.data(), s.size());
- if (r >= 0)
- r = write(m_fd, "\n", 1);
- if (r < 0)
- cerr << "problem writing to " << m_log_file << ": " << cpp_strerror(r) << std::endl;
- }
- if (do_syslog) {
- syslog(LOG_USER, "%s%s", buf, s.c_str());
- }
- if (do_stderr) {
- cerr << buf << s << std::endl;
- }
- }
- requeue->enqueue(e);
- }
- }
- buflen += e->m_stamp.sprintf(buf + buflen, sizeof(buf)-buflen);
- int sprintf(char *out, int outlen) const {
- struct tm bdt;
- time_t tt = sec();
- localtime_r(&tt, &bdt);
-
- return snprintf(out, outlen,
- "%04d-%02d-%02d %02d:%02d:%02d.%06ld",
- bdt.tm_year + 1900, bdt.tm_mon + 1, bdt.tm_mday,
- bdt.tm_hour, bdt.tm_min, bdt.tm_sec, usec());
- }
- 2015-05-29 10:20:45.076105
- buflen += snprintf(buf + buflen, sizeof(buf)-buflen, " %lx %2d ",
- (unsigned long)e->m_thread, e->m_prio);
- root@test3:/var/log/ceph# pidof ceph--mon
- root@test3:/var/log/ceph# pidof ceph-mon
- 138812
- root@test3:/var/log/ceph#
- root@test3:/var/log/ceph# ll /proc/138812/task
- total 0
- dr-xr-xr-x 23 root root 0 May 29 16:17 ./
- dr-xr-xr-x 9 root root 0 May 29 16:11 ../
- dr-xr-xr-x 6 root root 0 May 29 16:17 138812/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138813/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138814/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138815/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138816/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138817/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138818/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138819/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138820/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138821/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138822/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138823/
- dr-xr-xr-x 6 root root 0 May 29 16:17 138824/
- dr-xr-xr-x 6 root root 0 May 29 16:17 139643/
- dr-xr-xr-x 6 root root 0 May 29 16:17 139895/
- dr-xr-xr-x 6 root root 0 May 29 16:17 139896/
- dr-xr-xr-x 6 root root 0 May 29 16:17 143413/
- dr-xr-xr-x 6 root root 0 May 30 16:47 78042/
- dr-xr-xr-x 6 root root 0 May 30 16:47 78044/
- dr-xr-xr-x 6 root root 0 May 30 17:08 90496/
- dr-xr-xr-x 6 root root 0 May 30 17:08 90497/
- 2015-05-29 10:20:45.076105 7fa1c6a1a700 0 mon.jnqfg@2(peon).data_health(30) update_stats avail 92% total 95990980 used 2183384 avail 88908400
值得一提的是,ceph维护了一个m_recent队列,所有的消息都会存放到该队列中去,哪怕优先级比较低,不会打印到日志文件中去。
这就是_flush 函数中的 enqueue做的事情:
当然了,队列是有大小的限制,否则队列就会膨胀,导致内存耗尽。这个默认的限制是10000.
- while (m_recent.m_len > m_max_recent) {
- delete m_recent.dequeue();
- }
ceph提供了方法来查看最近的log,这就是log dump方法。
- ceph daemon /var/run/ceph/ceph-mon.*asok log dump
- {}
表面看啥也没输出,实际上将log dump到了日志文件中,比如我们例子,在log文件中出现:
- v2 ==== 42+0+0 (3105867923 0 0) 0x48d6bc0 con 0x3d5c9a0
- -13> 2015-05-30 17:23:06.772212 7f5856cac700 10 mon.jnqfg@2(peon) e3 handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2
- -12> 2015-05-30 17:23:06.772217 7f5856cac700 10 mon.jnqfg@2(peon) e3 check_sub monmap next 4 have 3
- -11> 2015-05-30 17:23:06.772222 7f5856cac700 10 mon.jnqfg@2(peon).osd e260 check_sub 0x631bdc0 next 0 (onetime)
- -10> 2015-05-30 17:23:06.773223 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260 src has 1..260) v3 -- ?+0 0x4230b40
- -9> 2015-05-30 17:23:06.773239 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663 10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0 0x48d6680
- -8> 2015-05-30 17:23:06.773586 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663 10.16.20.181:0/1138587 4 ==== mon_subscribe({monmap=4+,osdmap=0}) v2 ==== 42+0+0 (3105867923 0 0) 0x48d5180 con 0x3d5c9a0
- -7> 2015-05-30 17:23:06.773607 7f5856cac700 10 mon.jnqfg@2(peon) e3 handle_subscribe mon_subscribe({monmap=4+,osdmap=0}) v2
- -6> 2015-05-30 17:23:06.773613 7f5856cac700 10 mon.jnqfg@2(peon) e3 check_sub monmap next 4 have 3
- -5> 2015-05-30 17:23:06.773620 7f5856cac700 10 mon.jnqfg@2(peon).osd e260 check_sub 0x631a7c0 next 0 (onetime)
- -4> 2015-05-30 17:23:06.774626 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.? 10.16.20.181:0/1138587 -- osd_map(260..260 src has 1..260) v3 -- ?+0 0x4232ac0
- -3> 2015-05-30 17:23:06.774641 7f5856cac700 1 -- 10.16.20.183:6789/0 --> client.4442663 10.16.20.181:0/1138587 -- mon_subscribe_ack(300s) v1 -- ?+0 0x48d6bc0
- -2> 2015-05-30 17:23:06.775345 7f5856cac700 1 -- 10.16.20.183:6789/0 <== client.4442663 10.16.20.181:0/1138587 5 ==== mon_command({"prefix": "get_command_descriptions"} v 0) v1 ==== 80+0+0 (3374501561 0 0) 0x477f0e0 con 0x3d5c9a0
- -1> 2015-05-30 17:23:06.776789 7f5856cac700 1 -- 10.16.20.183:6789/0 --> 10.16.20.181:0/1138587 -- mon_command_ack([{"prefix": "get_command_descriptions"}]=0 v0) v1 -- ?+24689 0x3e62760 con 0x3d5c9a0
- 0> 2015-05-30 17:23:06.821389 7f5859662700 1 do_command 'log dump'
很有意思的是,ceph如何做到 ceph daemon /var/run/ceph/ceph-mon.*asok log dump 就把log dump到日志文件的。 这就牵扯到admin socket机制了。
admin socket 机制并不是一个非常新的机制,这是一个很commmon的设计。
很多进程,设计的时候,需要设计一个手段,能够运行时,接收用户的指令,不能将程序设计成一个黑匣子,相反,要提供一些手段,使是用户或者可以干预进程的运行(如改变配置项),或者可以获取到进程运行的状态信息。
admin socket机制,那是下一篇的任务。