Skip to content

Latest commit

 

History

History
163 lines (148 loc) · 4.82 KB

File metadata and controls

163 lines (148 loc) · 4.82 KB

initSidecarInjector

func (s *Server) initSidecarInjector(args *PilotArgs) (*inject.Webhook, error) {
	// 当前常量: "./var/lib/istio/inject"
	injectPath := args.InjectionOptions.InjectionDirectory
	if injectPath == "" || !injectionEnabled.Get() {
		log.Infof("Skipping sidecar injector, injection path is missing or disabled.")
		return nil, nil
	}

	// 本地或者远程配置存在将启动注入
	var watcher inject.Watcher
	// 查看本地是否有配置
	if _, err := os.Stat(filepath.Join(injectPath, "config")); !os.IsNotExist(err) {
		configFile := filepath.Join(injectPath, "config")
		valuesFile := filepath.Join(injectPath, "values")
		watcher, err = inject.NewFileWatcher(configFile, valuesFile)
		if err != nil {
			return nil, err
		}
	} else {
        //访问configmap,常量istio-sidecar-injector
		configMapName := getInjectorConfigMapName(args.Revision)
		cms := s.kubeClient.CoreV1().ConfigMaps(args.Namespace)
		if _, err := cms.Get(context.TODO(), configMapName, metav1.GetOptions{}); err != nil {
			if errors.IsNotFound(err) {
				log.Infof("Skipping sidecar injector, template not found")
				return nil, nil
			}
			return nil, err
		}
		watcher = inject.NewConfigMapWatcher(s.kubeClient, args.Namespace, configMapName, "config", "values")
	}

	log.Info("initializing sidecar injector")

	parameters := inject.WebhookParameters{
		Watcher: watcher,
		Env:     s.environment,
		// Disable monitoring. The injection metrics will be picked up by Pilots metrics exporter already
		MonitoringPort: -1,
		Mux:            s.httpsMux,
		Revision:       args.Revision,
	}

	wh, err := inject.NewWebhook(parameters)
	if err != nil {
		return nil, fmt.Errorf("failed to create injection webhook: %v", err)
	}
	// Patch cert if a webhook config name is provided.
	// This requires RBAC permissions - a low-priv Istiod should not attempt to patch but rely on
	// operator or CI/CD
	if features.InjectionWebhookConfigName.Get() != "" {
		s.addStartFunc(func(stop <-chan struct{}) error {
			// No leader election - different istiod revisions will patch their own cert.
			caBundlePath := s.caBundlePath
			if hasCustomTLSCerts(args.ServerOptions.TLSOptions) {
				caBundlePath = args.ServerOptions.TLSOptions.CaCertFile
			}
			webhooks.PatchCertLoop(features.InjectionWebhookConfigName.Get(), webhookName, caBundlePath, s.kubeClient, stop)
			return nil
		})
	}
	s.addStartFunc(func(stop <-chan struct{}) error {
		go wh.Run(stop)
		return nil
	})
	return wh, nil
}
func NewWebhook(p WebhookParameters) (*Webhook, error) {
	...
    //具体的处理器
	p.Mux.HandleFunc("/inject", wh.serveInject)
	p.Mux.HandleFunc("/inject/", wh.serveInject)

	p.Env.Watcher.AddMeshHandler(func() {
		wh.mu.Lock()
		wh.meshConfig = p.Env.Mesh()
		wh.mu.Unlock()
	})

	...
}
func (wh *Webhook) serveInject(w http.ResponseWriter, r *http.Request) {
	totalInjections.Increment()
	...
	
	var reviewResponse *kube.AdmissionResponse
	var obj runtime.Object
	var ar *kube.AdmissionReview
    // webhook传入admissionreview,获取admissionrequest 写入admissionresponse
	if out, _, err := deserializer.Decode(body, nil, obj); err != nil {
		handleError(fmt.Sprintf("Could not decode body: %v", err))
        //返回error信息
		reviewResponse = toAdmissionResponse(err)
	} else {
		log.Debugf("AdmissionRequest for path=%s\n", path)
		ar, err = kube.AdmissionReviewKubeToAdapter(out)
		if err != nil {
			handleError(fmt.Sprintf("Could not decode object: %v", err))
		}
        //注入
		reviewResponse = wh.inject(ar, path)
	}

	response := kube.AdmissionReview{}
	response.Response = reviewResponse
	var responseKube runtime.Object
	var apiVersion string
	if ar != nil {
		apiVersion = ar.APIVersion
		response.TypeMeta = ar.TypeMeta
		if response.Response != nil {
			if ar.Request != nil {
				response.Response.UID = ar.Request.UID
			}
		}
	}
    //根据版本返回信息
	responseKube = kube.AdmissionReviewAdapterToKube(&response, apiVersion)
	resp, err := json.Marshal(responseKube)
	if err != nil {
		log.Errorf("Could not encode response: %v", err)
		http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
	}
	if _, err := w.Write(resp); err != nil {
		log.Errorf("Could not write response: %v", err)
		http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
	}
}
func (wh *Webhook) inject(ar *kube.AdmissionReview, path string) *kube.AdmissionResponse {
    ...
    // 生成patch
	patchBytes, err := injectPod(params)
	if err != nil {
		handleError(fmt.Sprintf("Pod injection failed: %v", err))
		return toAdmissionResponse(err)
	}

    // 返回resp AdmissionResponse
	reviewResponse := kube.AdmissionResponse{
		Allowed: true,
		Patch:   patchBytes,
		PatchType: func() *string {
			pt := "JSONPatch"
			return &pt
		}(),
	}
	totalSuccessfulInjections.Increment()
	return &reviewResponse
}