Kubernetes Meetup Tokyo #23にいってきました

2019年9月27日に開催されたKubernetes Meetup Tokyo #23にブログ枠で参加させていただいたので、その模様について書いていきます。

会場

会場は渋谷ストリームの隣に誕生した渋谷スクランブルスクエアです。ビルの正式オープンが11月からとのことで絶賛内装工事中でした。 渋谷駅直結でとても便利な立地です。スポンサーはCyberAgent社です。

f:id:yamaguchi7073xtt:20190928172718j:plainf:id:yamaguchi7073xtt:20190928172803j:plainf:id:yamaguchi7073xtt:20190928172807j:plain
Session後の懇談会の様子

Session

ゼロから始めるKubernetes Controller

技術書典7で買わせていただいた、実践入門Kubernetes カスタムコントローラへの道の著者であられるバルゴさんによるKubernetes Controllerについての発表。

f:id:yamaguchi7073xtt:20190928180008j:plain speakerdeck.com

Controllerの概要から内部の実装についてまで説明されている100P超えの力作スライドです。 ReplicaSetをapplyしてからデリバリされるまでの処理を該当ソースへの参照箇所まで添えて説明してくれており開始10分でこれてよかったと思えました。

kubebuilder/controller-runtime入門 with v2 updates

kfyharukzさんによるkubebuilder v2の紹介/デモ。

kubernetesのAPIを拡張するためのSDK kubebuilderについての発表です。

www.slideshare.net

Kubernetesのbuiltin Resouceの理解もまだあやしい自分にとっては発展的な内容でした。CKA合格できたらCustom Resourceにも挑戦したいです。

Kubernetes 1.16: SIG-API Machineryの変更内容

Introduction of Operator Frameworkが行われる予定でしたが、発表者の方が体調不良とのことで内容が変更になりました。

LadicleさんによるKubernetes 1.16: SIG-API Machineryの変更内容qiita.com そもそもSIGという単語がわかっていませんでした。調べてみたところKubernetes Projectの開発単位という感じでしょうか。 興味があるSIGから追いかけてみるととっかかりとしてはよいというアドバイスもいただきました。

github.com

zlab社がQiitaでKubernetesやGo関連で参考になる記事をたくさんあげてくれているのでフォローしていこうと思いました。

qiita.com

Kubernetes 1.16: SIG-CLI の変更内容

同じくzlab社すぱぶらさんによるKubernetes 1.16: SIG-CLI の変更内容です。 qiita.com

kubectlの実践的な解説で自分が打ったことがないコマンドやオプションばかりでした。kubectl debug は利用できたらとても便利そうなので待ち遠しいです。

LT

ClusterAPI v1alpha1 → v1alpha2

r_takaishiさんによるClusterAPIについて。 発表資料のリンクが見つけられず。 ClusterAPIについてはまったくわかっておらず。そもそもKubernetesってCluster(特にcontroll plane)は作成されている前提だと考えていたので気になるところです。 技術書典7では買えていなかったはじめるCluster API読んで出直します。

f:id:yamaguchi7073xtt:20190928193713j:plain

自動化はshell-operator とともに。

nwiizoさんさんによるshell(bash)とkubernetesについて。 そもそも、operatorについてのわかっていないので、なんかbashでもkubernetesの処理にはさめるんだなくらいの理解しかできませんでした。 ここは議論のあるところだと思いますが、自分は以下の点からあまりbashを本番関連の処理に組み込みたくないと考えているのですがどうなんでしょうか。

  • network処理には必ずtimeout設定したい
  • error handling
  • logging(structured, leveling,...)
  • signal handling(gracefully shutdown, resource cleanup等)
  • test書きたい
  • 依存を明示

(bashでできないことはないと思うのですが上記のことやろうとすると肥大化するかscriptの手軽さが結局失われる)

自作 Controller による Secret の配布と収集 - unblee

unbleeさんによる speakerdeck.com

wantedly社でのKubernetes運用上の課題をControllerを作成して解決されたお話でした。 1-MicroService 1-Namespaceで運用されているそうです、実運用されている方々のNamespaceの切り方についてはもっとお話を伺ってみたいと思いました。

懇談会

SessionとLTの間に30分程度の懇談会の時間が設けられています。(飲食物の提供はCyberAgent社!) たまたま技術書典7で買って、当日も読んでいたカスタムコントローラへの道の著者のバルゴさんが発表されていたので、本買いましたとご挨拶させていただきました。またCKA、CKADをお持ちとのことで、CKAのアドバイスも聞けました。

qiita.com

Ladicleさんの発表の際に用いられていたQiitaのiconどこかでみたことあるなー思っていたのですが、Software Design 2017年9月号の特集 Web技術【超】入門いま一度振り返るWebのしくみと開発方法でweb serverの実装をgoでやられている方であることを思い出しました。 go-bindataで静的アセットファイルをgoのバイナリーに組み込むやり方をここではじめて知りました。

次回

次回は10月24日で、Kubernetes上で動かすアプリケーション開発のデバックとテストについてだそうです。 こちらも楽しみですね。

まとめ

Kubernetes Meetup Tokyoには初参加で、自分のKubernetesについての理解が、GKEにslack botをdeployしてみる程度*1でしたがとても勉強になり参加してよかったです。

せっかくなのでゼロから始めるKubernetes Controllerをおってみる

ゼロから始めるKubernetes Controllerで、ReplicaSetをapplyした際のControllerの挙動が該当コードのリンクつきで解説されているので、追えるところまでおってみました。kubernetesのversionは1.16です

kubectl apply

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_18.jpg?13730928 https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=19

kubectlでReplicaSetをapplyして、api-serverで処理されている前提です。

ReplicaSet Controllerの起動

slideでは、ReplicaSetControllerReplicaSetが生成されたことを検知したところから解説されていますが、起動処理を簡単におってみました。

ReplicaSetController自体は、kube-controller-manager binaryに含まれているので、起動処理はkube-controller-managerコマンドから始まります。

// Run runs the KubeControllerManagerOptions.  This should never exit.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
  // ...
  if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
            klog.Fatalf("error starting controllers: %v", err)
    }
  // ...
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/cmd/kube-controller-manager/app/controllermanager.go#L234

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
  controllers := map[string]InitFunc{}
  // ...
  controllers["replicaset"] = startReplicaSetController
  // ...
  return controllers
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/cmd/kube-controller-manager/app/controllermanager.go#L386

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
   // ...
  for controllerName, initFn := range controllers {
        // ...
        debugHandler, started, err := initFn(ctx)
        // ...
  }

  return nil

kubernetes/controllermanager.go at release-1.16 · kubernetes/kubernetes · GitHub

各controllerの起動処理を実行しているようです。 肝心のReplicaSetControllerの起動処理をみてみます。

func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
    // ...
    go replicaset.NewReplicaSetController(
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
    return nil, true, nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/cmd/kube-controller-manager/app/apps.go#L69:6

ReplicaSetControllerの生成と起動処理を別goroutineで実行しているようです。

func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
    // ...
    for i := 0; i < workers; i++ {
        go wait.Until(rsc.worker, time.Second, stopCh)
    }

    <-stopCh
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L177:34

指定された数のworkerを起動して、stopChでblockしています。

func (rsc *ReplicaSetController) worker() {
    for rsc.processNextWorkItem() {
    }
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
    key, quit := rsc.queue.Get()
    if quit {
        return false
    }
    defer rsc.queue.Done(key)

    err := rsc.syncHandler(key.(string))
    if err == nil {
        rsc.queue.Forget(key)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
    rsc.queue.AddRateLimited(key)

    return true
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L432:34

肝心のworkerはqueueからtaskを取得して、ReplicaSetController.syncHandler()処理を呼び出しています。 このqueueまわりもslideの後半で解説されていましたが、概要としてはapi-serverからcontrollerが関心のあるEventに絞って取得していると理解しています。

func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
    //...
    rsc.syncHandler = rsc.syncReplicaSet

    return rsc
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L163

ReplicaSetController.syncHandlerにはsyncReplicaSetが生成処理時にセットされています。

// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
    // ...
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    // ...
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    // ...
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    // ...
    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    // ...

    // Ignore inactive pods.
    filteredPods := controller.FilterActivePods(allPods)

    // NOTE: filteredPods are pointing to objects from cache - if you need to
    // modify them, you need to copy it first.
    filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
    // ...

    var manageReplicasErr error
    if rsNeedsSync && rs.DeletionTimestamp == nil {
        manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    }
    // ...

概要としては、処理対象のReplicaSetとfilterlingしたPodを取得して、ReplicaSetController.manageReplicas()を呼んでいます。 これでようやくslideの最初の処理のたどり着きました。

ReplicaSetController.manageReplicas()

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_19.jpg?13730929 https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=20

// manageReplicas checks and updates replicas for the given ReplicaSet.
// Does NOT modify <filteredPods>.
// It will requeue the replica set in case of an error while creating/deleting pods.
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    // ...
    if diff < 0 {
        diff *= -1
        // ...
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
            if err != nil && errors.IsTimeout(err) {
                // ...
                return nil
            }
            return err
        })
        // ...

        return err

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L459:34

func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
    remaining := count
    successes := 0
    for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
        errCh := make(chan error, batchSize)
        var wg sync.WaitGroup
        wg.Add(batchSize)
        for i := 0; i < batchSize; i++ {
            go func() {
                defer wg.Done()
                if err := fn(); err != nil {
                    errCh <- err
                }
            }()
        }
        wg.Wait()
        curSuccesses := batchSize - len(errCh)
        successes += curSuccesses
        if len(errCh) > 0 {
            return successes, <-errCh
        }
        remaining -= batchSize
    }
    return successes, nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L658:6

PodReplicaSetのreplica数の差分をとって、Podの作成処理を実行していますね。 slowStartBatch()は作成処理を並列で走らせるhelper関数のようです。 段階的に一度に起動するgoroutineの数を増やしていく処理の書き方として非常に参考になります。(IntMin()のような処理はstd libで欲しいと思ってしまう) ReplicaSetController.podControlはinterfaceで、実際のPod作成処理はReadPodControlが実装しています。

func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
    if err := validateControllerRef(controllerRef); err != nil {
        return err
    }
    return r.createPods("", namespace, template, controllerObject, controllerRef)
}

kubernetes/controller_utils.go at 2f76f5e63872a40ac08056289a6c52b4f6250154 · kubernetes/kubernetes · GitHub

func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
    pod, err := GetPodFromTemplate(template, object, controllerRef)
    // ...
    if len(nodeName) != 0 {
        pod.Spec.NodeName = nodeName
    }
    // ...
    newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(pod)
    if err != nil {
        r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
        return err
    }
    // ...
    return nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/controller_utils.go#L567

確かにslideのとおり、r.createPods("", namespace, template, controllerObject, controllerRef) としてnodeNameが空のPodを生成しているのがわかります。

Schedulerがenqueue

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_20.jpg?13730930

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=21

func AddAllEventHandlers(...) {
    // ...
    podInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return !assignedPod(t) && responsibleForPod(t, schedulerName)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
                    }
                    utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
                    return false
                default:
                    utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    sched.addPodToSchedulingQueue,
                UpdateFunc: sched.updatePodInSchedulingQueue,
                DeleteFunc: sched.deletePodFromSchedulingQueue,
            },
        },
    )

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/eventhandlers.go#L418

func assignedPod(pod *v1.Pod) bool {
    return len(pod.Spec.NodeName) != 0
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/eventhandlers.go#L323:6

SchedulerpodInformerにevent handlerを登録する際に、podがassigne(NodeNameが設定されている)されていないことを条件とするfilterを設定していることがわかります。

kubeletはskip

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_21.jpg?13730931

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=22

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
    newSourceApiserverFromLW(lw, updates)
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/config/apiserver.go#L32

kubeletのapi serverへのclient生成処理時に、PodのHost名が(おそらく)自身のnode名と一致するFilterを設定しているようです。

Schedulerがnode nameを設定

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_22.jpg?13730932

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=23

func (sched *Scheduler) scheduleOne() {
    // ...
    pod := sched.NextPod()
    // ...
    scheduleResult, err := sched.schedule(pod, pluginContext)
    // ...
    assumedPod := pod.DeepCopy()
    // ...

    // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    // ...
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/scheduler.go#L516

func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    // ...
    assumed.Spec.NodeName = host
    // ...
    return nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/scheduler.go#L447:25

Scheduling処理自体は一大Topicですが、流れとしては、なんらかの方法でNode名を選出して、Podのnode名に指定していることがわかります。

kubeletがコンテナを起動

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_23.jpg?13730933

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=24

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
  // ...
    // Step 6: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        if err := start("init container", container); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
    }
    // ...

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L803

kubeletの処理はまったく追えていないのですが、コンテナを起動しているような処理を実行しています。

kubeletPodのstatusを更新

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_24.jpg?13730934

func (kl *Kubelet) syncPod(o syncPodOptions) error {
    // pull out the required options
    pod := o.pod
    mirrorPod := o.mirrorPod
    podStatus := o.podStatus
    updateType := o.updateType

    // ...

    // Generate final API pod status with pod and status manager status
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/kubelet.go#L1481

PodのTerminating

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_25.jpg?13730935

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=26

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_26.jpg?13730936

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=27

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    if kl.podIsTerminated(pod) {
        if pod.DeletionTimestamp != nil {
            // If the pod is in a terminated state, there is no pod worker to
            // handle the work item. Check if the DeletionTimestamp has been
            // set, and force a status update to trigger a pod deletion request
            // to the apiserver.
            kl.statusManager.TerminatePod(pod)
        }
        return
    }
    // ...

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/kubelet.go#L1999

ReplicaSetControllerPodを削除

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_27.jpg?13730937

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=28

func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    // ...
    }
    if diff < 0 {
        // ...
    } else if diff > 0 {
        // ...
        // Choose which Pods to delete, preferring those in earlier phases of startup.
        podsToDelete := getPodsToDelete(filteredPods, diff)
        // ...
        errCh := make(chan error, diff)
        var wg sync.WaitGroup
        wg.Add(diff)
        for _, pod := range podsToDelete {
            go func(targetPod *v1.Pod) {
                defer wg.Done()
                if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
                    // ...
                    errCh <- err
                }
            }(pod)
        }
        wg.Wait()

        select {
        case err := <-errCh:
            // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
            if err != nil {
                return err
            }
        default:
        }
    }

    return nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L459

ここで再び、ReplicaSetController.manageReplicas()に戻ってきました。今度は、specよりも実際のPodが多いので、削除処理が走るようです。削除処理はシンプルに削除する数だけgoroutineを起動するようです。

Reconcile

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_28.jpg?13730938

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_29.jpg?13730939

ここまで、非常に簡単にですがslideにそって、ReplicaSetControllerを中心に該当コードを追いかけてみました。 kubernetesのcodeを初めて読んだのですが、各componentの実装がだいたいどのあたりあるのかを知るためのとっかかりとして非常に参考になりました。slideの後半では、InformerWorkQueueについても解説されているので、是非そちらも追いかけてみようと思います。

*1:会社のブログで記事にしました。 blog.howtelevision.co.jp

nushell/bookに日本語翻訳のPRだしたらマージしてもらえました

f:id:yamaguchi7073xtt:20190907001301p:plain

nushell/bookというRust製shellのbook(document)に日本語翻訳のPRをだしたらmergeしてもらえました。

ドキュメントはこちらから読めます。

そもそもnushellとは A modern, GitHub-era shell written in Rustと謳っているとおり、Rustで書かれたshellです。主な特徴は、コマンド間の入出力(Stream)をテキストベースからデータ構造に拡張している点です。

https://raw.githubusercontent.com/nushell/nushell/master/images/nushell-autocomplete4.gif

その後、メインテナーの一人であられるjonathanさんからnushell/bookのrepositoryのメンバーに招待していただき、merge権限までもらえました。

ドキュメントだけではありますが、はじめてオープンソースのプロジェクトにPRをmergeしてもらえ、チームに招待してもらえたのはすごくうれしかったです。

今後は、nushell本体にRustのCodeでContributeすることを目指していきたいです。

RustConf 2019にいってきました

現地時間(PDT) 8/22 ~ 23、オレゴン州ポートランドで開催されたRustConfに参加してきたので、その模様を書いていこうと思います。

参加のきっかけ

Rustに関わっておられる方々がにどんな人達なのか実際に見てみたいと思い、ちょうどRustConfの開催時期に夏季休暇と有給で1週間休みがとれそうだったので、思い切っていってみることにしました。 一人海外旅行もアメリカも初めてでした。

道のり

成田空港からポートランド国際空港(PDX)まで、デルタ航空の直通便が就航しており、片道10時間程度です。時差はJST - 16時間。 会場はオレゴンコンベンションセンターで、空港からMaxLightRailという電車で20分程度の距離でした。 入国審査で、目的は観光で滞在日数は4日と答えたところ、"Very Short" と言われました。

f:id:yamaguchi7073xtt:20190824165539j:plain
portlandの場所

f:id:yamaguchi7073xtt:20190824170322j:plainf:id:yamaguchi7073xtt:20190824185813j:plain
PDXとオレゴンコンベンションセンターの入り口

1日目

RustConfは2日に渡って開催され、1日目は、いくつかのTraining Courseが用意されています。 あらかじめ、参加したいcourseのticketを購入しておく必要があり、今回はAsyncのcourseを選択しました。(これ以外は全て売り切れていました。)

f:id:yamaguchi7073xtt:20190824171920j:plainf:id:yamaguchi7073xtt:20190824173528j:plainf:id:yamaguchi7073xtt:20190824174103j:plain

Async Courseの内容は、Futureの概要/Conceptの説明や、async-stdのhandsonで、chatを作ってみるものでした。ちょうど、前日にasync/await syntaxがmergeされ、rustc 1.39.0-nightly (e44fdf979 2019-08-21) versionを利用しました。

f:id:yamaguchi7073xtt:20190824175016j:plain
`1.39`から`async/await`がstable !!

async-bookにそって進めていったのですが、よくあるsample codeのuseが漏れていて、book通りに進めていくとcompileが通らないことがおきました。するとすかさず(おそらく)参加者の一人の方がPRを送り(それがmergeされ)、「画面をリロードしてくれ、もう直ってるから」といって、sample codeのcompileが通るようになる場面がありました。

Rustの非同期関連については、まったくわかっておらず、今回のcourseの参加をきっかけに

あたりから読んでみようと思っています。

f:id:yamaguchi7073xtt:20190824184720j:plainf:id:yamaguchi7073xtt:20190824184712j:plain
irlnaさんによるasyncの冊子(ちなみに、2日目のspeakerでもあられる)

2日目

2日目が本番といったところで、参加者の人数は1日目よりはるかに多かったです。

f:id:yamaguchi7073xtt:20190824185618j:plain

f:id:yamaguchi7073xtt:20190824190055j:plain
開始前のkeynote会場(はじまると8,9割程度埋まっていました)

openingとclosingのkeynote以外は、2つの会場でSessionが行われ、各々好きなほうを聞きに行く形式でした。schedule

自分は以下のsessionに参加しました。sessionの内容はそのうちyoutubeにupされるかと思います。

特に印象的(理解できた)だったのは

  • mongoDBのGUIであるcompassのschema parser部分をperformanceをだすためにjsからrust/wasmを利用する構成に書き換えた話

  • Facebookで、Rustの導入に取り組まれているC歴30年の方が、Rustは今までで初めて、every roleでCを置き換えられる言語だ的なことをおっしゃっていたこと

f:id:yamaguchi7073xtt:20190824191845j:plain
rustの求人
f:id:yamaguchi7073xtt:20190824191913j:plain
awsにもrustの募集がある
f:id:yamaguchi7073xtt:20190824191839p:plain
sponsors

感想

実際にOpenSourceなprojectの活動に参加してみて、あらためて、こういった活動にContributeできるようなエンジニアになりたいという思いを持ちました。 (あとは、英語の冗談で笑えるようになりたい)

Rustでdoubly linked list

Rustでdoubly linked listを書いてみました。

use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;

type Link<T> = Rc<RefCell<Node<T>>>;

#[derive(Debug)]
struct Node<T> {
    value: T,
    prev: Option<Link<T>>,
    next: Option<Link<T>>,
}

impl<T> Node<T> {
    fn new(value: T) -> Rc<RefCell<Self>> {
        Rc::new(RefCell::new(Self {
            value,
            prev: None,
            next: None,
        }))
    }
}

#[derive(Default)]
pub struct LinkedList<T> {
    head: Option<Link<T>>,
    tail: Option<Link<T>>,
    length: usize,
}

impl<T> LinkedList<T> {
    pub fn new() -> Self {
        Self {
            head: None,
            tail: None,
            length: 0,
        }
    }

    pub fn len(&self) -> usize {
        self.length
    }

    pub fn append(&mut self, v: T) {
        let node = Node::new(v);
        match self.tail.take() {
            Some(old_tail) => {
                old_tail.borrow_mut().next = Some(Rc::clone(&node));
                node.borrow_mut().prev = Some(old_tail);
            }
            None => {
                // first element
                debug_assert_eq!(self.len(), 0);
                self.head = Some(Rc::clone(&node));
            }
        }

        self.tail = Some(node);
        self.length += 1;
    }

    pub fn pop(&mut self) -> Option<T> {
        match self.tail.take() {
            Some(tail) => {
                if let Some(prev) = tail.borrow_mut().prev.take() {
                    prev.borrow_mut().next = None;
                    self.tail = Some(prev);
                } else {
                    // we take last element
                    debug_assert_eq!(self.len(), 1);
                    self.head = None;
                }
                self.length -= 1;
                let v = Rc::try_unwrap(tail) // Rc<RefCell<Node<T>> -> RefCell<Node<T>>
                    .ok() // Result<RefCell<Node<T>>, Rc<RefCell<Node<T>>>> -> Option<RefCell<Node<T>>>
                    .expect("Failed to Rc::try_unwrap tail node") // RefCell<Node<T>>
                    .into_inner() // RefCell<Node<T>> -> Node<T>
                    .value;
                Some(v)
            }
            None => None,
        }
    }

    pub fn iter(&self) -> Iter<T> {
        Iter {
            current: if self.len() == 0 {
                None
            } else {
                Some(Rc::clone(&self.head.as_ref().unwrap()))
            },
        }
    }
}

参照を相互に保持したい場合、Rc<RefCell<T>>でWrapしてやる必要があります。 Goならpointerを相互のfieldに代入すればよいだけなのですが、Rustの場合、Rcで参照の所有者が複数いることを、RefCellで、参照先からでも値が変更 できることを表現する必要があります。 型をみるだけで、この値は複数箇所で保持されていて、さらに変更されるうることまでわかりますね。 また、Rc::clone()node.clone()のようにmethod呼び出しではなく、Rc::clone(&node)のように明示的に、reference countedのcloneであることがわかるように呼ぶのが慣習だそうです。(node.clone()とあると、重たい処理かもしれないと思ってしまうからでしょうか。)

Rc<RefCell<T>>Tのfieldの所有権を奪おうとすると大変で、Rc::try_unwrap()して、RefCell<T>に変換してさらにRefCell::into_inner() で、T型にもどしてやる必要があります。

続いて、fmt::Debugを実装します。

impl<T: fmt::Display + Clone> fmt::Debug for LinkedList<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        let iter = self.iter();
        write!(f, "{{ head")?;
        for v in iter {
            write!(f, " -> {}", v)?;
        }
        write!(f, " }}")
    }
}

この実装がないと、println!("{:?}", list); としたときに、Node同士が相互参照しているので、循環参照してしまい、stackoverflowしてしまいます。 これを回避するには、どちからの参照をrc::Weakにしてもよいと思ったのですが、prev側を無視することにしました。

最後にiteratorを実装します。

impl<T: Clone> IntoIterator for LinkedList<T> {
    type Item = T;
    type IntoIter = Iter<T>;

    fn into_iter(self) -> Self::IntoIter {
        self.iter()
    }
}

pub struct Iter<T> {
    current: Option<Link<T>>,
}

impl<T: Clone> Iterator for Iter<T> {
    type Item = T;
    fn next(&mut self) -> Option<Self::Item> {
        match self.current.take() {
            None => None,
            Some(curr) => {
                let curr = curr.borrow();
                let v = curr.value.clone();
                match curr.next {
                    None => {
                        self.current = None;
                    }
                    Some(ref next) => {
                        self.current = Some(Rc::clone(next));
                    }
                }
                Some(v)
            }
        }
    }
}

impl<T: Clone> DoubleEndedIterator for Iter<T> {
    fn next_back(&mut self) -> Option<T> {
        match self.current.take() {
            None => None,
            Some(curr) => {
                let curr = curr.borrow();
                match curr.prev {
                    None => {
                        self.current = None;
                        None
                    }
                    Some(ref prev) => {
                        self.current = Some(Rc::clone(prev));
                        Some(prev.borrow().value.clone())
                    }
                }
            }
        }
    }
}

TにはClone boundを設けて楽をしました。DoubleEndedIteratorを実装すると

    #[test]
    fn reverse() {
        let mut list: LinkedList<i32> = LinkedList::new();
        (0..10).for_each(|n| list.append(n));

        let mut iter = list.iter();
        assert_eq!(iter.next(), Some(0));
        assert_eq!(iter.next(), Some(1));
        assert_eq!(iter.next(), Some(2));
        assert_eq!(iter.next(), Some(3));
        assert_eq!(iter.next_back(), Some(3));
        assert_eq!(iter.next_back(), Some(2));
        assert_eq!(iter.next_back(), Some(1));
        assert_eq!(iter.next_back(), Some(0));
        assert_eq!(iter.next_back(), None);
    }

このように、戻れるようになりました。 sourceはこちら

参考

Rust env_loggerの出力に色をつける

env_loggerの出力に色をつけたかったのですが、exampleが見つからず、docを読んだ結果以下のような処理になりました。

[dependencies]
log = "0.4.8"
env_logger = "0.6.2"
use env_logger::{fmt::Color, Builder};
use log::{Level,trace,debug,info,warn,error};
use std::io::Write;

fn init_logger() {
    let mut builder = Builder::new();

    builder.format(|buf, record| {
        let level_color = match record.level() {
            Level::Trace => Color::White,
            Level::Debug => Color::Blue,
            Level::Info => Color::Green,
            Level::Warn => Color::Yellow,
            Level::Error => Color::Red,
        };
        let mut level_style = buf.style();
        level_style.set_color(level_color);

        writeln!(
            buf,
            "{level} {file}:{line} {args}",
            level = level_style.value(record.level()),
            args = level_style.value(record.args()),
            file = level_style.value(&record.file().unwrap_or("____unknown")[4..]), // src/file.rs -> file.rs
            line = level_style.value(record.line().unwrap_or(0)),
        )
    });
    builder.filter(None, log::LevelFilter::Trace);
    builder.write_style(env_logger::WriteStyle::Auto);

    builder.init();
}

fn main() {
    init_logger();

    trace!("trace");
    debug!("debug");
    info!("info");
    warn!("warn");
    error!("error");
}