2019年9月27日に開催されたKubernetes Meetup Tokyo #23にブログ枠で参加させていただいたので、その模様について書いていきます。
会場
会場は渋谷ストリームの隣に誕生した渋谷スクランブルスクエアです。ビルの正式オープンが11月からとのことで絶賛内装工事中でした。 渋谷駅直結でとても便利な立地です。スポンサーはCyberAgent社です。
Session
ゼロから始めるKubernetes Controller
技術書典7で買わせていただいた、実践入門Kubernetes カスタムコントローラへの道の著者であられるバルゴさんによるKubernetes Controllerについての発表。
Controllerの概要から内部の実装についてまで説明されている100P超えの力作スライドです。 ReplicaSetをapplyしてからデリバリされるまでの処理を該当ソースへの参照箇所まで添えて説明してくれており開始10分でこれてよかったと思えました。
kubebuilder/controller-runtime入門 with v2 updates
kfyharukzさんによるkubebuilder v2の紹介/デモ。
kubernetesのAPIを拡張するためのSDK kubebuilderについての発表です。
www.slideshare.net
Kubernetesのbuiltin Resouceの理解もまだあやしい自分にとっては発展的な内容でした。CKA合格できたらCustom Resourceにも挑戦したいです。
Kubernetes 1.16: SIG-API Machineryの変更内容
Introduction of Operator Frameworkが行われる予定でしたが、発表者の方が体調不良とのことで内容が変更になりました。
LadicleさんによるKubernetes 1.16: SIG-API Machineryの変更内容qiita.com そもそもSIGという単語がわかっていませんでした。調べてみたところKubernetes Projectの開発単位という感じでしょうか。 興味があるSIGから追いかけてみるととっかかりとしてはよいというアドバイスもいただきました。
zlab社がQiitaでKubernetesやGo関連で参考になる記事をたくさんあげてくれているのでフォローしていこうと思いました。
Kubernetes 1.16: SIG-CLI の変更内容
同じくzlab社のすぱぶらさんによるKubernetes 1.16: SIG-CLI の変更内容です。 qiita.com
kubectlの実践的な解説で自分が打ったことがないコマンドやオプションばかりでした。kubectl debug
は利用できたらとても便利そうなので待ち遠しいです。
LT
ClusterAPI v1alpha1 → v1alpha2
r_takaishiさんによるClusterAPIについて。 発表資料のリンクが見つけられず。 ClusterAPIについてはまったくわかっておらず。そもそもKubernetesってCluster(特にcontroll plane)は作成されている前提だと考えていたので気になるところです。 技術書典7では買えていなかったはじめるCluster API読んで出直します。
自動化はshell-operator とともに。
nwiizoさんさんによるshell(bash)とkubernetesについて。 そもそも、operatorについてのわかっていないので、なんかbashでもkubernetesの処理にはさめるんだなくらいの理解しかできませんでした。 ここは議論のあるところだと思いますが、自分は以下の点からあまりbashを本番関連の処理に組み込みたくないと考えているのですがどうなんでしょうか。
- network処理には必ずtimeout設定したい
- error handling
- logging(structured, leveling,...)
- signal handling(gracefully shutdown, resource cleanup等)
- test書きたい
- 依存を明示
(bashでできないことはないと思うのですが上記のことやろうとすると肥大化するかscriptの手軽さが結局失われる)
自作 Controller による Secret の配布と収集 - unblee
wantedly社でのKubernetes運用上の課題をControllerを作成して解決されたお話でした。 1-MicroService 1-Namespaceで運用されているそうです、実運用されている方々のNamespaceの切り方についてはもっとお話を伺ってみたいと思いました。
懇談会
SessionとLTの間に30分程度の懇談会の時間が設けられています。(飲食物の提供はCyberAgent社!) たまたま技術書典7で買って、当日も読んでいたカスタムコントローラへの道の著者のバルゴさんが発表されていたので、本買いましたとご挨拶させていただきました。またCKA、CKADをお持ちとのことで、CKAのアドバイスも聞けました。
Ladicleさんの発表の際に用いられていたQiitaのiconどこかでみたことあるなー思っていたのですが、Software Design 2017年9月号の特集 Web技術【超】入門いま一度振り返るWebのしくみと開発方法でweb serverの実装をgoでやられている方であることを思い出しました。
go-bindata
で静的アセットファイルをgoのバイナリーに組み込むやり方をここではじめて知りました。
次回
次回は10月24日で、Kubernetes上で動かすアプリケーション開発のデバックとテストについてだそうです。 こちらも楽しみですね。
まとめ
Kubernetes Meetup Tokyoには初参加で、自分のKubernetesについての理解が、GKEにslack botをdeployしてみる程度*1でしたがとても勉強になり参加してよかったです。
せっかくなのでゼロから始めるKubernetes Controllerをおってみる
ゼロから始めるKubernetes Controllerで、ReplicaSetをapplyした際のControllerの挙動が該当コードのリンクつきで解説されているので、追えるところまでおってみました。kubernetesのversionは1.16です
kubectl apply
https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=19
kubectl
でReplicaSetをapplyして、api-serverで処理されている前提です。
ReplicaSet Controller
の起動
slideでは、ReplicaSetController
がReplicaSet
が生成されたことを検知したところから解説されていますが、起動処理を簡単におってみました。
ReplicaSetController
自体は、kube-controller-manager
binaryに含まれているので、起動処理はkube-controller-manager
コマンドから始まります。
// Run runs the KubeControllerManagerOptions. This should never exit. func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { // ... if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil { klog.Fatalf("error starting controllers: %v", err) } // ... }
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc { controllers := map[string]InitFunc{} // ... controllers["replicaset"] = startReplicaSetController // ... return controllers }
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error { // ... for controllerName, initFn := range controllers { // ... debugHandler, started, err := initFn(ctx) // ... } return nil
kubernetes/controllermanager.go at release-1.16 · kubernetes/kubernetes · GitHub
各controllerの起動処理を実行しているようです。
肝心のReplicaSetController
の起動処理をみてみます。
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) { // ... go replicaset.NewReplicaSetController( ctx.InformerFactory.Apps().V1().ReplicaSets(), ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop) return nil, true, nil }
ReplicaSetController
の生成と起動処理を別goroutineで実行しているようです。
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { // ... for i := 0; i < workers; i++ { go wait.Until(rsc.worker, time.Second, stopCh) } <-stopCh }
指定された数のworkerを起動して、stopCh
でblockしています。
func (rsc *ReplicaSetController) worker() { for rsc.processNextWorkItem() { } } func (rsc *ReplicaSetController) processNextWorkItem() bool { key, quit := rsc.queue.Get() if quit { return false } defer rsc.queue.Done(key) err := rsc.syncHandler(key.(string)) if err == nil { rsc.queue.Forget(key) return true } utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err)) rsc.queue.AddRateLimited(key) return true }
肝心のworkerはqueueからtaskを取得して、ReplicaSetController.syncHandler()
処理を呼び出しています。
このqueueまわりもslideの後半で解説されていましたが、概要としてはapi-serverからcontrollerが関心のあるEventに絞って取得していると理解しています。
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController { //... rsc.syncHandler = rsc.syncReplicaSet return rsc }
ReplicaSetController.syncHandler
にはsyncReplicaSet
が生成処理時にセットされています。
// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled, // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // invoked concurrently with the same key. func (rsc *ReplicaSetController) syncReplicaSet(key string) error { // ... namespace, name, err := cache.SplitMetaNamespaceKey(key) // ... rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) // ... selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) // ... allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) // ... // Ignore inactive pods. filteredPods := controller.FilterActivePods(allPods) // NOTE: filteredPods are pointing to objects from cache - if you need to // modify them, you need to copy it first. filteredPods, err = rsc.claimPods(rs, selector, filteredPods) // ... var manageReplicasErr error if rsNeedsSync && rs.DeletionTimestamp == nil { manageReplicasErr = rsc.manageReplicas(filteredPods, rs) } // ...
概要としては、処理対象のReplicaSet
とfilterlingしたPod
を取得して、ReplicaSetController.manageReplicas()
を呼んでいます。
これでようやくslideの最初の処理のたどり着きました。
ReplicaSetController.manageReplicas()
https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=20
// manageReplicas checks and updates replicas for the given ReplicaSet. // Does NOT modify <filteredPods>. // It will requeue the replica set in case of an error while creating/deleting pods. func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) // ... if diff < 0 { diff *= -1 // ... successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error { err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind)) if err != nil && errors.IsTimeout(err) { // ... return nil } return err }) // ... return err
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) { remaining := count successes := 0 for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) { errCh := make(chan error, batchSize) var wg sync.WaitGroup wg.Add(batchSize) for i := 0; i < batchSize; i++ { go func() { defer wg.Done() if err := fn(); err != nil { errCh <- err } }() } wg.Wait() curSuccesses := batchSize - len(errCh) successes += curSuccesses if len(errCh) > 0 { return successes, <-errCh } remaining -= batchSize } return successes, nil }
Pod
とReplicaSet
のreplica数の差分をとって、Pod
の作成処理を実行していますね。
slowStartBatch()
は作成処理を並列で走らせるhelper関数のようです。
段階的に一度に起動するgoroutineの数を増やしていく処理の書き方として非常に参考になります。(IntMin()
のような処理はstd libで欲しいと思ってしまう)
ReplicaSetController.podControl
はinterfaceで、実際のPod
作成処理はReadPodControl
が実装しています。
func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { if err := validateControllerRef(controllerRef); err != nil { return err } return r.createPods("", namespace, template, controllerObject, controllerRef) }
func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { pod, err := GetPodFromTemplate(template, object, controllerRef) // ... if len(nodeName) != 0 { pod.Spec.NodeName = nodeName } // ... newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(pod) if err != nil { r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err) return err } // ... return nil }
確かにslideのとおり、r.createPods("", namespace, template, controllerObject, controllerRef)
としてnodeName
が空のPod
を生成しているのがわかります。
Schedulerがenqueue
func AddAllEventHandlers(...) { // ... podInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return !assignedPod(t) && responsibleForPod(t, schedulerName) case cache.DeletedFinalStateUnknown: if pod, ok := t.Obj.(*v1.Pod); ok { return !assignedPod(pod) && responsibleForPod(pod, schedulerName) } utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) return false default: utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) return false } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToSchedulingQueue, UpdateFunc: sched.updatePodInSchedulingQueue, DeleteFunc: sched.deletePodFromSchedulingQueue, }, }, )
func assignedPod(pod *v1.Pod) bool { return len(pod.Spec.NodeName) != 0 }
Scheduler
がpodInformer
にevent handlerを登録する際に、podがassigne(NodeName
が設定されている)されていないことを条件とするfilterを設定していることがわかります。
kubeletはskip
// NewSourceApiserver creates a config source that watches and pulls from the apiserver. func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) { lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName))) newSourceApiserverFromLW(lw, updates) }
kubelet
のapi serverへのclient生成処理時に、Pod
のHost名が(おそらく)自身のnode名と一致するFilterを設定しているようです。
Schedulerがnode nameを設定
func (sched *Scheduler) scheduleOne() { // ... pod := sched.NextPod() // ... scheduleResult, err := sched.schedule(pod, pluginContext) // ... assumedPod := pod.DeepCopy() // ... // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) // ... }
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // ... assumed.Spec.NodeName = host // ... return nil }
Scheduling処理自体は一大Topicですが、流れとしては、なんらかの方法でNode名を選出して、Pod
のnode名に指定していることがわかります。
kubelet
がコンテナを起動
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { // ... // Step 6: start the init container. if container := podContainerChanges.NextInitContainerToStart; container != nil { // Start the next init container. if err := start("init container", container); err != nil { return } // Successfully started the container; clear the entry in the failure klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod)) } // ...
kubelet
の処理はまったく追えていないのですが、コンテナを起動しているような処理を実行しています。
kubelet
がPod
のstatusを更新
func (kl *Kubelet) syncPod(o syncPodOptions) error { // pull out the required options pod := o.pod mirrorPod := o.mirrorPod podStatus := o.podStatus updateType := o.updateType // ... // Generate final API pod status with pod and status manager status apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
Pod
のTerminating
// dispatchWork starts the asynchronous sync of the pod in a pod worker. // If the pod is terminated, dispatchWork func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { if kl.podIsTerminated(pod) { if pod.DeletionTimestamp != nil { // If the pod is in a terminated state, there is no pod worker to // handle the work item. Check if the DeletionTimestamp has been // set, and force a status update to trigger a pod deletion request // to the apiserver. kl.statusManager.TerminatePod(pod) } return } // ...
ReplicaSetController
がPod
を削除
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) // ... } if diff < 0 { // ... } else if diff > 0 { // ... // Choose which Pods to delete, preferring those in earlier phases of startup. podsToDelete := getPodsToDelete(filteredPods, diff) // ... errCh := make(chan error, diff) var wg sync.WaitGroup wg.Add(diff) for _, pod := range podsToDelete { go func(targetPod *v1.Pod) { defer wg.Done() if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil { // ... errCh <- err } }(pod) } wg.Wait() select { case err := <-errCh: // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. if err != nil { return err } default: } } return nil }
ここで再び、ReplicaSetController.manageReplicas()
に戻ってきました。今度は、specよりも実際のPod
が多いので、削除処理が走るようです。削除処理はシンプルに削除する数だけgoroutineを起動するようです。
Reconcile
ここまで、非常に簡単にですがslideにそって、ReplicaSetController
を中心に該当コードを追いかけてみました。
kubernetesのcodeを初めて読んだのですが、各componentの実装がだいたいどのあたりあるのかを知るためのとっかかりとして非常に参考になりました。slideの後半では、Informer
やWorkQueue
についても解説されているので、是非そちらも追いかけてみようと思います。
*1:会社のブログで記事にしました。 blog.howtelevision.co.jp