[containerd] 初始化流程概览

1. 环境

  • containerd版本:v1.7.2,containerd debug搭建教程链接
  • 操作系统:Ubuntu22.04

2. 初始化流程

  containerd的入口为:cmd/containerd/main.go,如下:

func main() {
	// TODO 实例化containerd
	app := command.App()
	// 实际上这里的app是一个命令行工具封装的,app.Run的运行也是固定的,主要是为了执行app.Action,所以只需要重点分析app.Action干了啥
	if err := app.Run(os.Args); err != nil {
		fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
		os.Exit(1)
	}
}

  这里的App实际上一个命令行工具的封装,这里执行app.Run的时候实际上执行的时AppAction方法,因此我们需要关心的是containerd是如何实现这个Action

  不过,话说回来,在如今cobra大行其道的时候,containerd居然会使用urfave这个命令行工具,这个工具相比于cobra有何优略势以后倒是可以研究下。

  containerd实现的App有很多细节我们并不需要关心,这里我们重点关心containerd是如何实现Action方法的,毕竟,containerd开始运行后,第一时间就是执行Action方法

func App() *cli.App {
	... // 省略不需要太关心的代码
	app.Action = func(context *cli.Context) error {
		var (
			start       = time.Now()
			signals     = make(chan os.Signal, 2048)
			serverC     = make(chan *server.Server, 1)
			ctx, cancel = gocontext.WithCancel(gocontext.Background())
			config      = defaultConfig()
		)

		defer cancel()

		// Only try to load the config if it either exists, or the user explicitly
		// told us to load this6 path.
		configPath := context.GlobalString("config") // 获取配置文件路径
		_, err := os.Stat(configPath)
		if !os.IsNotExist(err) || context.GlobalIsSet("config") {
			if err := srvconfig.LoadConfig(configPath, config); err != nil {
				return err
			}
		}

		// Apply flags to the config 解析/etc/containerd/config.toml配置文件到config对象当中
		if err := applyFlags(context, config); err != nil {
			return err
		}

		if config.GRPC.Address == "" {
			return fmt.Errorf("grpc address cannot be empty: %w", errdefs.ErrInvalidArgument)
		}
		if config.TTRPC.Address == "" {
			// If TTRPC was not explicitly configured, use defaults based on GRPC.
			config.TTRPC.Address = config.GRPC.Address + ".ttrpc"
			config.TTRPC.UID = config.GRPC.UID
			config.TTRPC.GID = config.GRPC.GID
		}

		// Make sure top-level directories are created early. 确保一些目录必须存在
		if err := server.CreateTopLevelDirectories(config); err != nil {
			return err
		}

		// Stop if we are registering or unregistering against Windows SCM. 仅和Windows有关
		stop, err := registerUnregisterService(config.Root)
		if err != nil {
			logrus.Fatal(err)
		}
		if stop {
			return nil
		}

		done := handleSignals(ctx, signals, serverC, cancel) // 处理退出信号
		// start the signal handler as soon as we can to make sure that
		// we don't miss any signals during boot
		signal.Notify(signals, handledSignals...)

		// cleanup temp mounts
		if err := mount.SetTempMountLocation(filepath.Join(config.Root, "tmpmounts")); err != nil {
			return fmt.Errorf("creating temp mount location: %w", err)
		}
		// unmount all temp mounts on boot for the server
		warnings, err := mount.CleanupTempMounts(0)
		if err != nil {
			log.G(ctx).WithError(err).Error("unmounting temp mounts")
		}
		for _, w := range warnings {
			log.G(ctx).WithError(w).Warn("cleanup temp mount")
		}

		log.G(ctx).WithFields(log.Fields{
			"version":  version.Version,
			"revision": version.Revision,
		}).Info("starting containerd")

		type srvResp struct {
			s   *server.Server
			err error
		}

		// run server initialization in a goroutine so we don't end up blocking important things like SIGTERM handling
		// while the server is initializing.
		// As an example, opening the bolt database blocks forever if a containerd instance
		// is already running, which must then be forcibly terminated (SIGKILL) to recover.
		chsrv := make(chan srvResp)
		go func() {
			defer close(chsrv)

			// TODO 这里干了啥?
			server, err := server.New(ctx, config)
			if err != nil {
				select {
				case chsrv <- srvResp{err: err}:
				case <-ctx.Done():
				}
				return
			}

			// Launch as a Windows Service if necessary 这里主要是在适配windows,直接忽略
			if err := launchService(server, done); err != nil {
				logrus.Fatal(err)
			}
			select {
			case <-ctx.Done():
				server.Stop()
			case chsrv <- srvResp{s: server}:
			}
		}()

		var server *server.Server
		select { // 等待Containerd Server初始化完成
		case <-ctx.Done():
			return ctx.Err()
		case r := <-chsrv:
			if r.err != nil {
				return r.err
			}
			server = r.s
		}

		// We don't send the server down serverC directly in the goroutine above because we need it lower down.
		select { // TODO 这里为啥这么写,没看懂上面的注释
		case <-ctx.Done():
			return ctx.Err()
		case serverC <- server:
		}

		// 开启containerd的debug功能,开启后可以通过/debug/vars, /debug/pprof这样的URL查看containerd部分数据
		if config.Debug.Address != "" {
			var l net.Listener
			if isLocalAddress(config.Debug.Address) {
				if l, err = sys.GetLocalListener(config.Debug.Address, config.Debug.UID, config.Debug.GID); err != nil {
					return fmt.Errorf("failed to get listener for debug endpoint: %w", err)
				}
			} else {
				if l, err = net.Listen("tcp", config.Debug.Address); err != nil {
					return fmt.Errorf("failed to get listener for debug endpoint: %w", err)
				}
			}
			serve(ctx, l, server.ServeDebug)
		}
		// containerd的指数据
		if config.Metrics.Address != "" {
			l, err := net.Listen("tcp", config.Metrics.Address)
			if err != nil {
				return fmt.Errorf("failed to get listener for metrics endpoint: %w", err)
			}
			serve(ctx, l, server.ServeMetrics)
		}
		// setup the ttrpc endpoint 创建containerd.sock.ttrpc文件
		tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)
		if err != nil {
			return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err)
		}
		serve(ctx, tl, server.ServeTTRPC)

		if config.GRPC.TCPAddress != "" {
			l, err := net.Listen("tcp", config.GRPC.TCPAddress)
			if err != nil {
				return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err)
			}
			serve(ctx, l, server.ServeTCP)
		}
		// setup the main grpc endpoint 创建container.sock文件
		l, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID)
		if err != nil {
			return fmt.Errorf("failed to get listener for main endpoint: %w", err)
		}
		serve(ctx, l, server.ServeGRPC)

		readyC := make(chan struct{})
		go func() {
			server.Wait()
			close(readyC)
		}()

		select {
		case <-readyC:
			if err := notifyReady(ctx); err != nil {
				log.G(ctx).WithError(err).Warn("notify ready failed")
			}
			// containerd成功启动
			log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())
			<-done
		case <-done:
		}
		return nil
	}
	return app
}

  如上所示,这里我们忽略的一些无关紧要的代码,重点关心Action的实现。

  通过分析,我们发现,Action方法主要是做了如下一些操作:

  • 1、加载containerd的配置,并校验某些配置的值,如果没有指定containerd配置文件的位置,那么containerd默认的配置文件为/etc/containerd/config.toml
  • 2、创建containerdroot目录以及state目录;实际上,所谓的root目录,指的是containerd保存元数据的位置,譬如镜像、运行时数据、快照等等,默认root目录就是/var/lib/containerd。而所谓的state目录则是存放containerd socket文件的目录,该目录的默认值为:/run/containerd
  • 3、监听SIGPIPE, SIGUSR1, SIGTERM, SIGINT信号
  • 4、清理临时目录
  • 5、实例化containerd server,这个就是我们的重点,稍后我们着重分析
  • 6、根据配置暴露debug接口,开启后可以通过/debug/vars, /debug/pprof这样的URL查看containerd部分数据
  • 7、根据配置暴露metric指标
  • 8、运行GRPC, TCP, TTRPC服务

  解析来我们继续分析containerd server是如何实例化的。

func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
	// 主要是为了设置OOM参数以及Cgroup
	if err := apply(ctx, config); err != nil {
		return nil, err
	}
	// 设置超时参数,这里使用一个Map来保存
	for key, sec := range config.Timeouts {
		d, err := time.ParseDuration(sec)
		if err != nil {
			return nil, fmt.Errorf("unable to parse %s into a time duration", sec)
		}
		timeout.Set(key, d)
	}
	// TODO 加载插件
	plugins, err := LoadPlugins(ctx, config)
	if err != nil {
		return nil, err
	}
	// TODO StreamProcessor是啥玩意?
	for id, p := range config.StreamProcessors {
		diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args, p.Env))
	}

	// TODO 增加了GRPC Server Option参数
	serverOpts := []grpc.ServerOption{
		grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
			otelgrpc.StreamServerInterceptor(),
			grpc_prometheus.StreamServerInterceptor,
			streamNamespaceInterceptor,
		)),
		grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
			otelgrpc.UnaryServerInterceptor(),
			grpc_prometheus.UnaryServerInterceptor,
			unaryNamespaceInterceptor,
		)),
	}
	// 设置GRPC可以消息的最大阈值
	if config.GRPC.MaxRecvMsgSize > 0 {
		serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize))
	}
	// 设置GRPC发送消息的最大阈值
	if config.GRPC.MaxSendMsgSize > 0 {
		serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize))
	}
	// 实例化TTRPCServer,所谓的TTRPC,实际上就设置GRPC ober TLS
	ttrpcServer, err := newTTRPCServer()
	if err != nil {
		return nil, err
	}
	tcpServerOpts := serverOpts
	// 设置TLS证书
	if config.GRPC.TCPTLSCert != "" {
		log.G(ctx).Info("setting up tls on tcp GRPC services...")

		tlsCert, err := tls.LoadX509KeyPair(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey)
		if err != nil {
			return nil, err
		}
		tlsConfig := &tls.Config{Certificates: []tls.Certificate{tlsCert}}

		if config.GRPC.TCPTLSCA != "" {
			caCertPool := x509.NewCertPool()
			caCert, err := os.ReadFile(config.GRPC.TCPTLSCA)
			if err != nil {
				return nil, fmt.Errorf("failed to load CA file: %w", err)
			}
			caCertPool.AppendCertsFromPEM(caCert)
			tlsConfig.ClientCAs = caCertPool
			tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
		}

		tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
	}

	// grpcService allows GRPC services to be registered with the underlying server
	type grpcService interface {
		Register(*grpc.Server) error
	}

	// tcpService allows GRPC services to be registered with the underlying tcp server
	type tcpService interface {
		RegisterTCP(*grpc.Server) error
	}

	// ttrpcService allows TTRPC services to be registered with the underlying server
	type ttrpcService interface {
		RegisterTTRPC(*ttrpc.Server) error
	}

	var (
		grpcServer = grpc.NewServer(serverOpts...)
		tcpServer  = grpc.NewServer(tcpServerOpts...)

		grpcServices  []grpcService
		tcpServices   []tcpService
		ttrpcServices []ttrpcService

		s = &Server{
			grpcServer:  grpcServer,
			tcpServer:   tcpServer,
			ttrpcServer: ttrpcServer,
			config:      config,
		}
		// TODO: Remove this in 2.0 and let event plugin crease it
		events      = exchange.NewExchange()
		initialized = plugin.NewPluginSet()
		required    = make(map[string]struct{})
	)
	for _, r := range config.RequiredPlugins {
		required[r] = struct{}{}
	}
	for _, p := range plugins {
		id := p.URI()
		reqID := id
		if config.GetVersion() == 1 {
			reqID = p.ID
		}
		log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)

		initContext := plugin.NewContext(
			ctx,
			p,
			initialized,
			config.Root,
			config.State,
		)
		initContext.Events = events
		initContext.Address = config.GRPC.Address
		initContext.TTRPCAddress = config.TTRPC.Address
		initContext.RegisterReadiness = s.RegisterReadiness

		// load the plugin specific configuration if it is provided
		if p.Config != nil {
			// 反序列化当前插件的配置
			pc, err := config.Decode(p)
			if err != nil {
				return nil, err
			}
			initContext.Config = pc
		}
		// 执行插件的InitFn函数,并实例化插件实体
		result := p.Init(initContext)
		if err := initialized.Add(result); err != nil {
			return nil, fmt.Errorf("could not add plugin result to plugin set: %w", err)
		}

		// 获取实例化的插件实体,并且获取实例化插件实体时的错误
		instance, err := result.Instance()
		if err != nil {
			if plugin.IsSkipPlugin(err) {
				log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
			} else {
				log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)
			}
			if _, ok := required[reqID]; ok {
				return nil, fmt.Errorf("load required plugin %s: %w", id, err)
			}
			continue
		}

		// 每删除一个插件,都需要从required中删除此插件
		delete(required, reqID)
		// check for grpc services that should be registered with the server
		if src, ok := instance.(grpcService); ok {
			grpcServices = append(grpcServices, src)
		}
		if src, ok := instance.(ttrpcService); ok {
			ttrpcServices = append(ttrpcServices, src)
		}
		if service, ok := instance.(tcpService); ok {
			tcpServices = append(tcpServices, service)
		}

		s.plugins = append(s.plugins, result)
	}
	// 如果插件加载完成,但是还有必要的插件没有加载,那么只能退出containerd的初始化
	if len(required) != 0 {
		var missing []string
		for id := range required {
			missing = append(missing, id)
		}
		return nil, fmt.Errorf("required plugin %s not included", missing)
	}

	// register services after all plugins have been initialized
	// 注册服务
	for _, service := range grpcServices {
		if err := service.Register(grpcServer); err != nil {
			return nil, err
		}
	}
	for _, service := range ttrpcServices {
		if err := service.RegisterTTRPC(ttrpcServer); err != nil {
			return nil, err
		}
	}
	for _, service := range tcpServices {
		if err := service.RegisterTCP(tcpServer); err != nil {
			return nil, err
		}
	}
	return s, nil
}

  以上代码就是containerd server初始化逻辑,主要做了这么几个事情:

  • 1、根据containerd的配置,设置OOM以及Cgroup参数
  • 2、设置超时参数,主要设置了
    • io.containerd.timeout.task.state = 2s
    • io.containerd.timeout.bolt.open = 0s
    • io.containerd.timeout.metrics.shimstats = 2s
    • io.containerd.timeout.shim.cleanup = 5s
    • io.containerd.timeout.shim.load = 5s
    • io.containerd.timeout.shim.shutdown = 3s
  • 3、加载插件
    • 其一是动态加载plugin_dir目录中包含的插件,实际上追踪进去你会发现,注释会提示containerd 1.8以前都不会支持动态加载插件,估计这个特性还在开发当中。
    • 其二是加载content插件,这个插件的具体作用我们以后会分析,看了containerd的同学估计会对这个插件有点印象,擦测这个插件是实现ContentService的关键,以后在分析
    • 其三是加载代理插件,containerd的代理插件具体作用不得而知,以后在分析吧,今天我们先看个整体流程,毕竟我也是初学者。
    • 实际上通过IDEA debug源码的时候,你会发现,containerd最终会注册50个插件。然鹅,在debug的时候根本就没有看到注册的代码,最终跟踪下来,你会发现,这些插件除了content插件,其余的插件都是各个插件在自己的Init函数当汇总注册的,containerd已启动的时候就会注册这些插件
  • 4、处理stream process配置,这玩意具体作用现在我也不知道,后续在分析吧。
  • 5、根据之前注册的插件根据插件的配置实例化插件,如果有任何必须的插件没有初始化,就认为containerd初始化失败
  • 6、注册服务

  以上就是containerd的总体初始化流程,今天只是看了一个大概,其中还有很多不懂的地方,后续我们再各个击破。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/52331.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Vue没有node_modules怎么办

npm install 一下 然后再npm run serve 就可以运行了

记一次偶然的网站sql注入

自己学了点渗透的内容后就开始尝试挖漏洞了&#xff0c;偶然发现了这个yp网站&#xff0c;由于好奇心就浏览了一下里面的内容&#xff0c;突然注意到有个id的地方跳转页面&#xff0c;于是就想试试看有没有注入&#xff0c;就有了以下的内容。。。 界面如下 当时就是好奇点进去…

事件标志组

Q: 什么是事件标志组&#xff1f; A: 事件标志位&#xff1a;表明某个事件是否发生&#xff0c;联想&#xff1a;全局变量 flag。通常按位表示&#xff0c;每一个位表示一个事件&#xff08;高8位不算&#xff09; 事件标志组是一组事件标志位的集合&#xff0c; 可以简单的理…

ElasticSearch基础篇-Java API操作

ElasticSearch基础-Java API操作 演示代码 创建连接 POM依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:sch…

springCloud Eureka注册中心配置详解

1、创建一个springBoot项目 2、在springBoot项目中添加SpringCloud依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.3</version><type>…

从源程序到可执行文件的四个过程

从源程序到可执行文件的四个过程 预处理编译汇编链接 程序要运行起来&#xff0c;必须要经过四个步骤&#xff1a;预处理、编译、汇编和链接&#xff0c;如下图所示&#xff1a; -E选项&#xff1a;提示编译器执行完预处理就停下来&#xff0c;后边的编译、汇编、链接就先不执…

关于在VS2017中编译Qt项目遇到的问题

关于在VS2017中编译Qt项目遇到的问题 【QT】VS打开QT项目运行不成功 error MSB6006 “cmd.exe”已退出,代码为 2。如何在VS2017里部署的Qt Designer上编辑槽函数 【QT】VS打开QT项目运行不成功 error MSB6006 “cmd.exe”已退出,代码为 2。 链接 如何在VS2017里部署的Qt Design…

flask实现一个登录界面

flask实现一个登录界面 基础的Flask项目结构 forms.py&#xff1a;定义登录表单和表单字段的文件。templates/login.html&#xff1a;用于渲染登录表单的 HTML 模板文件。routes.py&#xff1a;定义应用的路由和视图函数的文件。__init__.py&#xff1a;创建并初始化 Flask 应…

解压缩软件WinRAR-bandizip-7z--洛

个人收集的解压软件&#xff01;后期还会更新 ------------------------------------------------------------------- WinRAR&#xff1a;密码1234WinRARhttps://wwzb.lanzoue.com/b0485ldcj BandiZip&#xff1a;密码1234 Bandizip-Professionalhttps://wwzb.lanzoue.com/…

SpringBoot内嵌的Tomcat:

SpringBoot内嵌Tomcat源码&#xff1a; 1、调用启动类SpringbootdemoApplication中的SpringApplication.run()方法。 SpringBootApplication public class SpringbootdemoApplication {public static void main(String[] args) {SpringApplication.run(SpringbootdemoApplicat…

windows下载安装FFmpeg

FFmpeg是一款强大的音视频处理软件&#xff0c;下面介绍如何在windows下下载安装FFmpeg 下载 进入官网: https://ffmpeg.org/download.html, 选择Windows, 然后选择"Windows builds from gyan.dev" 在弹出的界面中找到release builds, 然后选择一个版本&#xff0…

如何在MacBook上彻底删除mysql

好久以前安装过&#xff0c;但是现在配置mysql一直出错&#xff0c;索性全部删掉重新配置。 一、停止MySQL服务 首先&#xff0c;请确保 MySQL 服务器已经停止运行&#xff0c;以免影响后续的删除操作。 sudo /usr/local/mysql/support-files/mysql.server stop如果你输入之…

【RTT驱动框架分析03】- sfus flash 操作库的分析和基于STM32F103RCT6+CUBEMX的SFUS移植教程

sfus flash 操作库的分析 sfus 抽象 /*** serial flash device*/ typedef struct {char *name; /**< serial flash name */size_t index; /**< index of flash device information table see flash_…

IntelliJ IDEA流行的构建工具——Gradle

IntelliJ IDEA&#xff0c;是java编程语言开发的集成环境。IntelliJ在业界被公认为最好的java开发工具&#xff0c;尤其在智能代码助手、代码自动提示、重构、JavaEE支持、各类版本工具(git、svn等)、JUnit、CVS整合、代码分析、 创新的GUI设计等方面的功能可以说是超常的。 如…

Hive之窗口函数lag()/lead()

一、函数介绍 lag()与lead函数是跟偏移量相关的两个分析函数 通过这两个函数可以在一次查询中取出同一字段的前N行的数据(lag)和后N行的数据(lead)作为独立的列,从而更方便地进行进行数据过滤&#xff0c;该操作可代替表的自联接&#xff0c;且效率更高 lag()/lead() lag(c…

《GreenPlum系列-部署维护》GreenPlum数据库Standby故障处理

一、Standby故障 1.检查监控中心数据库状态 2.查看master节点数据库状态 su - gpadmin gpstate -f二、重启数据库 1.快速关闭数据库 [gpadminmdw pg_log]$ gpstop -M fast ... Continue with Greenplum instance shutdown Yy|Nn (defaultN): > y ...2.开启数据库 [gpad…

短视频矩阵源码开发搭建分享--多账号授权管理

目录 文章目录 前言 一、矩阵号系统是什么&#xff1f; 二、使用步骤 1.创建推广项目 2.多账号授权 3.企业号智能客服系统 总结 前言 短视频多账号矩阵系统&#xff0c;通过多账号一键授权管理的方式&#xff0c;为运营人员打造功能强大及全面的“矩阵式“管理平台。…

EMC学习笔记(二十)EMC常用元件简单介绍(二)

EMC常用元件简单介绍&#xff08;二&#xff09; 1.瞬态抑制二极管&#xff08;TVS&#xff09;2.气体放电管3.半导体放电管 电磁兼容性元件是解决电磁干扰发射和电磁敏感度问题的关键,正确选择和使用这些元件是做好电磁兼容性设计的前提。由于每一种电子元件都有它各自的特性,…

关于Java的多线程实现

多线程介绍 进程&#xff1a;进程指正在运行的程序。确切的来说&#xff0c;当一个程序进入内存运行&#xff0c;即变成一个进程&#xff0c;进程是处于运行过程中的程序&#xff0c;并且具有一定独立功能。 线程&#xff1a;线程是进程中的一个执行单元&#xff0c;负责当前进…

LabVIEW开发小型减阻试验平台

LabVIEW开发小型减阻试验平台 湍流摩擦在粘性流体的阻力中起着重要作用&#xff0c;减少湍流摩擦是流体力学领域的热门话题之一。在油气管道的长距离流体输送中&#xff0c;泵站提供的几乎所有动力都用于克服流体的胫骨摩擦。在流体输送领域&#xff0c;船舶的蒙皮摩擦阻力占总…