Kubernetes Meetup Tokyo #23にいってきました

2019年9月27日に開催されたKubernetes Meetup Tokyo #23にブログ枠で参加させていただいたので、その模様について書いていきます。

会場

会場は渋谷ストリームの隣に誕生した渋谷スクランブルスクエアです。ビルの正式オープンが11月からとのことで絶賛内装工事中でした。 渋谷駅直結でとても便利な立地です。スポンサーはCyberAgent社です。

f:id:yamaguchi7073xtt:20190928172718j:plainf:id:yamaguchi7073xtt:20190928172803j:plainf:id:yamaguchi7073xtt:20190928172807j:plain
Session後の懇談会の様子

Session

ゼロから始めるKubernetes Controller

技術書典7で買わせていただいた、実践入門Kubernetes カスタムコントローラへの道の著者であられるバルゴさんによるKubernetes Controllerについての発表。

f:id:yamaguchi7073xtt:20190928180008j:plain speakerdeck.com

Controllerの概要から内部の実装についてまで説明されている100P超えの力作スライドです。 ReplicaSetをapplyしてからデリバリされるまでの処理を該当ソースへの参照箇所まで添えて説明してくれており開始10分でこれてよかったと思えました。

kubebuilder/controller-runtime入門 with v2 updates

kfyharukzさんによるkubebuilder v2の紹介/デモ。

kubernetesのAPIを拡張するためのSDK kubebuilderについての発表です。

www.slideshare.net

Kubernetesのbuiltin Resouceの理解もまだあやしい自分にとっては発展的な内容でした。CKA合格できたらCustom Resourceにも挑戦したいです。

Kubernetes 1.16: SIG-API Machineryの変更内容

Introduction of Operator Frameworkが行われる予定でしたが、発表者の方が体調不良とのことで内容が変更になりました。

LadicleさんによるKubernetes 1.16: SIG-API Machineryの変更内容qiita.com そもそもSIGという単語がわかっていませんでした。調べてみたところKubernetes Projectの開発単位という感じでしょうか。 興味があるSIGから追いかけてみるととっかかりとしてはよいというアドバイスもいただきました。

github.com

zlab社がQiitaでKubernetesやGo関連で参考になる記事をたくさんあげてくれているのでフォローしていこうと思いました。

qiita.com

Kubernetes 1.16: SIG-CLI の変更内容

同じくzlab社すぱぶらさんによるKubernetes 1.16: SIG-CLI の変更内容です。 qiita.com

kubectlの実践的な解説で自分が打ったことがないコマンドやオプションばかりでした。kubectl debug は利用できたらとても便利そうなので待ち遠しいです。

LT

ClusterAPI v1alpha1 → v1alpha2

r_takaishiさんによるClusterAPIについて。 発表資料のリンクが見つけられず。 ClusterAPIについてはまったくわかっておらず。そもそもKubernetesってCluster(特にcontroll plane)は作成されている前提だと考えていたので気になるところです。 技術書典7では買えていなかったはじめるCluster API読んで出直します。

f:id:yamaguchi7073xtt:20190928193713j:plain

自動化はshell-operator とともに。

nwiizoさんさんによるshell(bash)とkubernetesについて。 そもそも、operatorについてのわかっていないので、なんかbashでもkubernetesの処理にはさめるんだなくらいの理解しかできませんでした。 ここは議論のあるところだと思いますが、自分は以下の点からあまりbashを本番関連の処理に組み込みたくないと考えているのですがどうなんでしょうか。

  • network処理には必ずtimeout設定したい
  • error handling
  • logging(structured, leveling,...)
  • signal handling(gracefully shutdown, resource cleanup等)
  • test書きたい
  • 依存を明示

(bashでできないことはないと思うのですが上記のことやろうとすると肥大化するかscriptの手軽さが結局失われる)

自作 Controller による Secret の配布と収集 - unblee

unbleeさんによる speakerdeck.com

wantedly社でのKubernetes運用上の課題をControllerを作成して解決されたお話でした。 1-MicroService 1-Namespaceで運用されているそうです、実運用されている方々のNamespaceの切り方についてはもっとお話を伺ってみたいと思いました。

懇談会

SessionとLTの間に30分程度の懇談会の時間が設けられています。(飲食物の提供はCyberAgent社!) たまたま技術書典7で買って、当日も読んでいたカスタムコントローラへの道の著者のバルゴさんが発表されていたので、本買いましたとご挨拶させていただきました。またCKA、CKADをお持ちとのことで、CKAのアドバイスも聞けました。

qiita.com

Ladicleさんの発表の際に用いられていたQiitaのiconどこかでみたことあるなー思っていたのですが、Software Design 2017年9月号の特集 Web技術【超】入門いま一度振り返るWebのしくみと開発方法でweb serverの実装をgoでやられている方であることを思い出しました。 go-bindataで静的アセットファイルをgoのバイナリーに組み込むやり方をここではじめて知りました。

次回

次回は10月24日で、Kubernetes上で動かすアプリケーション開発のデバックとテストについてだそうです。 こちらも楽しみですね。

まとめ

Kubernetes Meetup Tokyoには初参加で、自分のKubernetesについての理解が、GKEにslack botをdeployしてみる程度*1でしたがとても勉強になり参加してよかったです。

せっかくなのでゼロから始めるKubernetes Controllerをおってみる

ゼロから始めるKubernetes Controllerで、ReplicaSetをapplyした際のControllerの挙動が該当コードのリンクつきで解説されているので、追えるところまでおってみました。kubernetesのversionは1.16です

kubectl apply

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_18.jpg?13730928 https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=19

kubectlでReplicaSetをapplyして、api-serverで処理されている前提です。

ReplicaSet Controllerの起動

slideでは、ReplicaSetControllerReplicaSetが生成されたことを検知したところから解説されていますが、起動処理を簡単におってみました。

ReplicaSetController自体は、kube-controller-manager binaryに含まれているので、起動処理はkube-controller-managerコマンドから始まります。

// Run runs the KubeControllerManagerOptions.  This should never exit.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
  // ...
  if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
            klog.Fatalf("error starting controllers: %v", err)
    }
  // ...
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/cmd/kube-controller-manager/app/controllermanager.go#L234

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
  controllers := map[string]InitFunc{}
  // ...
  controllers["replicaset"] = startReplicaSetController
  // ...
  return controllers
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/cmd/kube-controller-manager/app/controllermanager.go#L386

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
   // ...
  for controllerName, initFn := range controllers {
        // ...
        debugHandler, started, err := initFn(ctx)
        // ...
  }

  return nil

kubernetes/controllermanager.go at release-1.16 · kubernetes/kubernetes · GitHub

各controllerの起動処理を実行しているようです。 肝心のReplicaSetControllerの起動処理をみてみます。

func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
    // ...
    go replicaset.NewReplicaSetController(
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
    return nil, true, nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/cmd/kube-controller-manager/app/apps.go#L69:6

ReplicaSetControllerの生成と起動処理を別goroutineで実行しているようです。

func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
    // ...
    for i := 0; i < workers; i++ {
        go wait.Until(rsc.worker, time.Second, stopCh)
    }

    <-stopCh
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L177:34

指定された数のworkerを起動して、stopChでblockしています。

func (rsc *ReplicaSetController) worker() {
    for rsc.processNextWorkItem() {
    }
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
    key, quit := rsc.queue.Get()
    if quit {
        return false
    }
    defer rsc.queue.Done(key)

    err := rsc.syncHandler(key.(string))
    if err == nil {
        rsc.queue.Forget(key)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
    rsc.queue.AddRateLimited(key)

    return true
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L432:34

肝心のworkerはqueueからtaskを取得して、ReplicaSetController.syncHandler()処理を呼び出しています。 このqueueまわりもslideの後半で解説されていましたが、概要としてはapi-serverからcontrollerが関心のあるEventに絞って取得していると理解しています。

func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
    //...
    rsc.syncHandler = rsc.syncReplicaSet

    return rsc
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L163

ReplicaSetController.syncHandlerにはsyncReplicaSetが生成処理時にセットされています。

// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
    // ...
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    // ...
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    // ...
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    // ...
    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    // ...

    // Ignore inactive pods.
    filteredPods := controller.FilterActivePods(allPods)

    // NOTE: filteredPods are pointing to objects from cache - if you need to
    // modify them, you need to copy it first.
    filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
    // ...

    var manageReplicasErr error
    if rsNeedsSync && rs.DeletionTimestamp == nil {
        manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    }
    // ...

概要としては、処理対象のReplicaSetとfilterlingしたPodを取得して、ReplicaSetController.manageReplicas()を呼んでいます。 これでようやくslideの最初の処理のたどり着きました。

ReplicaSetController.manageReplicas()

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_19.jpg?13730929 https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=20

// manageReplicas checks and updates replicas for the given ReplicaSet.
// Does NOT modify <filteredPods>.
// It will requeue the replica set in case of an error while creating/deleting pods.
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    // ...
    if diff < 0 {
        diff *= -1
        // ...
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
            if err != nil && errors.IsTimeout(err) {
                // ...
                return nil
            }
            return err
        })
        // ...

        return err

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L459:34

func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
    remaining := count
    successes := 0
    for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
        errCh := make(chan error, batchSize)
        var wg sync.WaitGroup
        wg.Add(batchSize)
        for i := 0; i < batchSize; i++ {
            go func() {
                defer wg.Done()
                if err := fn(); err != nil {
                    errCh <- err
                }
            }()
        }
        wg.Wait()
        curSuccesses := batchSize - len(errCh)
        successes += curSuccesses
        if len(errCh) > 0 {
            return successes, <-errCh
        }
        remaining -= batchSize
    }
    return successes, nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L658:6

PodReplicaSetのreplica数の差分をとって、Podの作成処理を実行していますね。 slowStartBatch()は作成処理を並列で走らせるhelper関数のようです。 段階的に一度に起動するgoroutineの数を増やしていく処理の書き方として非常に参考になります。(IntMin()のような処理はstd libで欲しいと思ってしまう) ReplicaSetController.podControlはinterfaceで、実際のPod作成処理はReadPodControlが実装しています。

func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
    if err := validateControllerRef(controllerRef); err != nil {
        return err
    }
    return r.createPods("", namespace, template, controllerObject, controllerRef)
}

kubernetes/controller_utils.go at 2f76f5e63872a40ac08056289a6c52b4f6250154 · kubernetes/kubernetes · GitHub

func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
    pod, err := GetPodFromTemplate(template, object, controllerRef)
    // ...
    if len(nodeName) != 0 {
        pod.Spec.NodeName = nodeName
    }
    // ...
    newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(pod)
    if err != nil {
        r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
        return err
    }
    // ...
    return nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/controller_utils.go#L567

確かにslideのとおり、r.createPods("", namespace, template, controllerObject, controllerRef) としてnodeNameが空のPodを生成しているのがわかります。

Schedulerがenqueue

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_20.jpg?13730930

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=21

func AddAllEventHandlers(...) {
    // ...
    podInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return !assignedPod(t) && responsibleForPod(t, schedulerName)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
                    }
                    utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
                    return false
                default:
                    utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    sched.addPodToSchedulingQueue,
                UpdateFunc: sched.updatePodInSchedulingQueue,
                DeleteFunc: sched.deletePodFromSchedulingQueue,
            },
        },
    )

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/eventhandlers.go#L418

func assignedPod(pod *v1.Pod) bool {
    return len(pod.Spec.NodeName) != 0
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/eventhandlers.go#L323:6

SchedulerpodInformerにevent handlerを登録する際に、podがassigne(NodeNameが設定されている)されていないことを条件とするfilterを設定していることがわかります。

kubeletはskip

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_21.jpg?13730931

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=22

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
    newSourceApiserverFromLW(lw, updates)
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/config/apiserver.go#L32

kubeletのapi serverへのclient生成処理時に、PodのHost名が(おそらく)自身のnode名と一致するFilterを設定しているようです。

Schedulerがnode nameを設定

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_22.jpg?13730932

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=23

func (sched *Scheduler) scheduleOne() {
    // ...
    pod := sched.NextPod()
    // ...
    scheduleResult, err := sched.schedule(pod, pluginContext)
    // ...
    assumedPod := pod.DeepCopy()
    // ...

    // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    // ...
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/scheduler.go#L516

func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    // ...
    assumed.Spec.NodeName = host
    // ...
    return nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/scheduler.go#L447:25

Scheduling処理自体は一大Topicですが、流れとしては、なんらかの方法でNode名を選出して、Podのnode名に指定していることがわかります。

kubeletがコンテナを起動

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_23.jpg?13730933

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=24

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
  // ...
    // Step 6: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        if err := start("init container", container); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
    }
    // ...

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L803

kubeletの処理はまったく追えていないのですが、コンテナを起動しているような処理を実行しています。

kubeletPodのstatusを更新

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_24.jpg?13730934

func (kl *Kubelet) syncPod(o syncPodOptions) error {
    // pull out the required options
    pod := o.pod
    mirrorPod := o.mirrorPod
    podStatus := o.podStatus
    updateType := o.updateType

    // ...

    // Generate final API pod status with pod and status manager status
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/kubelet.go#L1481

PodのTerminating

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_25.jpg?13730935

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=26

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_26.jpg?13730936

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=27

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    if kl.podIsTerminated(pod) {
        if pod.DeletionTimestamp != nil {
            // If the pod is in a terminated state, there is no pod worker to
            // handle the work item. Check if the DeletionTimestamp has been
            // set, and force a status update to trigger a pod deletion request
            // to the apiserver.
            kl.statusManager.TerminatePod(pod)
        }
        return
    }
    // ...

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/kubelet.go#L1999

ReplicaSetControllerPodを削除

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_27.jpg?13730937

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=28

func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    // ...
    }
    if diff < 0 {
        // ...
    } else if diff > 0 {
        // ...
        // Choose which Pods to delete, preferring those in earlier phases of startup.
        podsToDelete := getPodsToDelete(filteredPods, diff)
        // ...
        errCh := make(chan error, diff)
        var wg sync.WaitGroup
        wg.Add(diff)
        for _, pod := range podsToDelete {
            go func(targetPod *v1.Pod) {
                defer wg.Done()
                if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
                    // ...
                    errCh <- err
                }
            }(pod)
        }
        wg.Wait()

        select {
        case err := <-errCh:
            // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
            if err != nil {
                return err
            }
        default:
        }
    }

    return nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L459

ここで再び、ReplicaSetController.manageReplicas()に戻ってきました。今度は、specよりも実際のPodが多いので、削除処理が走るようです。削除処理はシンプルに削除する数だけgoroutineを起動するようです。

Reconcile

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_28.jpg?13730938

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_29.jpg?13730939

ここまで、非常に簡単にですがslideにそって、ReplicaSetControllerを中心に該当コードを追いかけてみました。 kubernetesのcodeを初めて読んだのですが、各componentの実装がだいたいどのあたりあるのかを知るためのとっかかりとして非常に参考になりました。slideの後半では、InformerWorkQueueについても解説されているので、是非そちらも追いかけてみようと思います。

*1:会社のブログで記事にしました。 blog.howtelevision.co.jp