这篇文章来看下docker中的apiserver的实现。
我们从api := apiserver.New(serverConfig)这个代码开始看。来看下New的实现是什么,这里serverConfig结构体的内容为:
1 2 3 4 5 6 7 | serverConfig := &apiserver.ServerConfig{ Logging: true, EnableCors: daemonCfg.EnableCors, CorsHeaders: daemonCfg.CorsHeaders, Version: dockerversion.VERSION, } serverConfig = setPlatformServerConfig(serverConfig, daemonCfg) |
New的代码为:
1 2 3 4 5 6 7 8 9 | func New(cfg *ServerConfig) *Server { srv := &Server{ cfg: cfg, start: make(chan struct{}), } r := createRouter(srv) srv.router = r returnsrv } |
这里建立了一个server结构体:
1 2 3 4 5 6 7 | type Server struct{ daemon *daemon.Daemon cfg *ServerConfig router *mux.Router start chan struct{} servers []serverCloser } |
同时这个server结构体的router被赋值了一个router结构体,这里的router是什么呢?
熟悉OpenStack中使用频率特别高的routes组件的同学应该能猜到了,这里的router就是决定了一个URL最终映射到什么处理函数的一个路由。我们可以看到createRoute的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | // we keep enableCors just for legacy usage, need to be removed in the future func createRouter(s *Server) *mux.Router { r := mux.NewRouter() ifos.Getenv("DEBUG") != ""{ ProfilerSetup(r, "/debug/") } m := map[string]map[string]HttpApiFunc{ "GET": { "/_ping": s.ping, ...... }, "POST": { "/auth": s.postAuth, ...... }, "DELETE": { "/containers/{name:.*}": s.deleteContainers, ...... }, "OPTIONS": { "": s.optionsHandler, }, } // If "api-cors-header" is not given, but "api-enable-cors" is true, we set cors to "*" // otherwise, all head values will be passed to HTTP handler corsHeaders := s.cfg.CorsHeaders ifcorsHeaders == ""&& s.cfg.EnableCors { corsHeaders = "*" } formethod, routes := range m { forroute, fct := range routes { logrus.Debugf("Registering %s, %s", method, route) // NOTE: scope issue, make sure the variables are local and won't be changed localRoute := route localFct := fct localMethod := method // build the handler function f := makeHttpHandler(s.cfg.Logging, localMethod, localRoute, localFct, corsHeaders, version.Version(s.cfg.Version)) // add the new route iflocalRoute == ""{ r.Methods(localMethod).HandlerFunc(f) } else{ r.Path("/v{version:[0-9.]+}"+ localRoute).Methods(localMethod).HandlerFunc(f) r.Path(localRoute).Methods(localMethod).HandlerFunc(f) } } } returnr } |
上面的代码首先建立了一个mux.NewRouter()结构体r,接着我们可以看到一个m的map,其中存放了HTTP请求类型、URL地址以及实际的处理函数的映射关系。然后会将这个map注入到r中,来看下面的这段代码:
1 2 3 4 5 6 7 8 9 10 | // build the handler function f := makeHttpHandler(s.cfg.Logging, localMethod, localRoute, localFct, corsHeaders, version.Version(s.cfg.Version)) // add the new route iflocalRoute == ""{ r.Methods(localMethod).HandlerFunc(f) } else{ r.Path("/v{version:[0-9.]+}"+ localRoute).Methods(localMethod).HandlerFunc(f) r.Path(localRoute).Methods(localMethod).HandlerFunc(f) } |
makeHttpHandler实质上是对localFct做了一个wrap,也就是说当apiserver收到一个HTTP请求的时候,makeHttpHandler生成的func会先对这个HTTP请求做一些检查,比如client和server的版本号是否ok等等,然后才会调用localFct:
1 2 3 4 | iferr := handlerFunc(version, w, r, mux.Vars(r)); err != nil { logrus.Errorf("Handler for %s %s returned error: %s", localMethod, localRoute, err) httpError(w, err) } |
接着代码会判断localRoute是否为空字符串,如果是的话就不绑定带版本号的URL了。至于r.Path(localRoute).Methods(localMethod).HandlerFunc(f)做的就是在r中存放这个信息。熟悉Python中routes或者是熟悉tornado的同学应该很容易理解这个过程。之后在看某个方法的具体实现的时候可以来这里找到URL的mapping,然后查看对应函数的实现。
现在我们有了一个server,看下这个server启动时发生了什么。在代码中可以看到:
1 2 3 4 5 6 7 8 | go func() { iferr := api.ServeApi(flHosts); err != nil { logrus.Errorf("ServeAPI error: %v", err) serveAPIWait <- err return } serveAPIWait <- nil }() |
这里应该是启动我们的server了,看下其实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | // ServeApi loops through all of the protocols sent in to docker and spawns // off a go routine to setup a serving http.Server for each. func (s *Server) ServeApi(protoAddrs []string) error { var chErrors = make(chan error, len(protoAddrs)) for_, protoAddr := range protoAddrs { protoAddrParts := strings.SplitN(protoAddr, "://", 2) iflen(protoAddrParts) != 2 { returnfmt.Errorf("bad format, expected PROTO://ADDR") } srv, err := s.newServer(protoAddrParts[0], protoAddrParts[1]) iferr != nil { returnerr } s.servers = append(s.servers, srv...) for_, s := range srv { logrus.Infof("Listening for HTTP on %s (%s)", protoAddrParts[0], protoAddrParts[1]) go func(s serverCloser) { iferr := s.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") { err = nil } chErrors <- err }(s) } } fori := 0; i < len(protoAddrs); i++ { err := <-chErrors iferr != nil { returnerr } } returnnil } |
首先根据监听的地址分别调用s.newServer(protoAddrParts[0], protoAddrParts[1])创建一个干活的server,然后后者调用Serve开始服务。s.newServer会根据监听的协议类型建立套接字,然后启动一个HttpServer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | ...... switchproto { ...... case"tcp": l, err := s.initTcpSocket(addr) iferr != nil { returnnil, err } ls = append(ls, l) default: returnnil, fmt.Errorf("Invalid protocol format: %q", proto) } var res []serverCloser for_, l := range ls { res = append(res, &HttpServer{ &http.Server{ Addr: addr, Handler: s.router, }, l, }) } returnres, nil } |
这里HttpServer的结构体为:
1 2 3 4 | type HttpServer struct{ srv *http.Server l net.Listener } |
而http.Server则是go的原生server。
稍微来理一下,我们现在有下面的结构:
* 一个api,这个api是docker实现的server,代表了提供HTTP服务的一个逻辑上的server
* api下面有一个servers数组,每个数组都是实际提供服务的server,之所以有这么多server是因为可能存在多种监听方式或者是监听的地址
* api.servers中的server的最终实现是通过go的http.Server实现的
再来接着看主线代码:
1 2 3 4 5 6 7 8 9 10 11 12 | registryService := registry.NewService(registryCfg) d, err := daemon.NewDaemon(daemonCfg, registryService) iferr != nil { ifpfile != nil { iferr := pfile.Remove(); err != nil { logrus.Error(err) } } logrus.Fatalf("Error starting daemon: %v", err) } logrus.Info("Daemon has completed initialization") |
这里我们看到了一个registryService以及一个daemon.NewDaemon的函数,registryService目前只是一个保存了配置信息的结构体:
1 2 3 4 5 6 7 8 9 10 11 | // NewService returns a new instance of Service ready to be // installed no an engine. func NewService(options *Options) *Service { return&Service{ Config: NewServiceConfig(options), } } ...... type Service struct{ Config *ServiceConfig } |
daemon.NewDaemon比较复杂,我们最后再看。先来继续看api的剩余代码:
1 2 3 | // after the daemon is done setting up we can tell the api to start // accepting connections with specified daemon api.AcceptConnections(d) |
1 2 3 4 5 6 7 8 9 10 11 12 | func (s *Server) AcceptConnections(d *daemon.Daemon) { // Tell the init daemon we are accepting requests s.daemon = d s.registerSubRouter() go systemd.SdNotify("READY=1") // close the lock so the listeners start accepting connections select { case<-s.start: default: close(s.start) } } |
这里通过channel释放了start的锁。start的锁是在等待呢?上面提到的套接字建立的代码中可以看到类似如下代码:
1 2 3 4 5 | case"tcp": l, err := s.initTcpSocket(addr) ...... case"unix": l, err := sockets.NewUnixSocket(addr, s.cfg.SocketGroup, s.start) |
initTcpSocket中也会传递s.start。
apiserver基本上就是这么多东西,其实就是一个带了route的http server。接下来有两大块地方要分析,一个是我们的daemon,还有一个就是请求的真正实现。我们先通过一个简单的请求的实现代码来看下daemon的作用:
1 | "/containers/ps": s.getContainersJSON, |
s.getContainersJSON的实现为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | func (s *Server) getContainersJSON(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { iferr := parseForm(r); err != nil { returnerr } config := &daemon.ContainersConfig{ All: boolValue(r, "all"), Size: boolValue(r, "size"), Since: r.Form.Get("since"), Before: r.Form.Get("before"), Filters: r.Form.Get("filters"), } iftmpLimit := r.Form.Get("limit"); tmpLimit != ""{ limit, err := strconv.Atoi(tmpLimit) iferr != nil { returnerr } config.Limit = limit } containers, err := s.daemon.Containers(config) iferr != nil { returnerr } returnwriteJSON(w, http.StatusOK, containers) } |
可以看到实际上这个请求是由s.daemon完成的。我们上面看到了api其实就是一个路由请求的框架,会把URL路由的s的相关函数上。但是这些函数其实还是空架子,真正干活的是s.daemon。现在对daemon的地位及功能有了大概的认识后,我们接下来的文章会分析daemon的实现以及一些重要请求处理函数的实现。