Rustでgitのtagをbumpする

同じチームのメンバーがnodegitというnodeのlibgit2のbindingを利用して、便利なツールを作っているのをみて各言語にgitを操作するためのライブラリーへのbindingがあることを知りました。 そこで、今回はRustのlibgit2 bindingであるgit2-rsを利用して、gitのtagをbumpして、remoteにpushするcliを作ってみようと思います。 sourceはこちら

作ったcli

f:id:yamaguchi7073xtt:20200405224835g:plain
git-bump

実行するとlocalのtag一覧を取得して、semantic versionでsortし、bumpするversionを選択します。 versionを選択してもらったら、現在のHEADを対象にtagを作成し、remoteにpushします。

bump処理

[dependencies]
clap = "2.33.0"
git2 = "0.13.0"
tracing = "0.1.13"
tracing-subscriber = "0.2.3"
log = "0.4.8"
semver = "0.9.0"
anyhow = "1.0.27"
colored = "1.9.3"
dialoguer = "0.5.0"
pub mod cli;

use anyhow::anyhow;
use colored::*;
use dialoguer::theme::ColorfulTheme;
use semver::{SemVerError, Version};
use std::io::{self, Write};
use std::result::Result as StdResult;
use std::borrow::Cow;
use tracing::{debug, warn};

#[derive(Debug, PartialEq, Eq)]
pub enum Bump {
    Major,
    Minor,
    Patch,
}

type Result<T> = std::result::Result<T, anyhow::Error>;

pub struct Config {
    pub prefix: Option<String>,
    pub repository_path: Option<String>,
    #[doc(hidden)]
    pub __non_exhaustive: (), // https://xaeroxe.github.io/init-struct-pattern/
}

impl Default for Config {
    fn default() -> Self {
        Self {
            prefix: Some("v".to_owned()),
            repository_path: None,
            __non_exhaustive: (),
        }
    }
}

impl Config {
    pub fn bump(self) -> Result<()> {
        self.build()?.bump()
    }

    fn build(self) -> Result<Bumper> {
        let repo = match self.repository_path {
            Some(path) => git2::Repository::open(&path)?,
            None => git2::Repository::open_from_env()?,
        };
        Ok(Bumper {
            prefix: self.prefix,
            repo,
            cfg: git2::Config::open_default()?,
            w: io::stdout(),
        })
    }
}

struct Bumper {
    prefix: Option<String>,
    repo: git2::Repository,
    cfg: git2::Config,
    w: io::Stdout,
}

impl Bumper {
    fn bump(mut self) -> Result<()> {
        let pattern = self.prefix.as_deref().map(|p| format!("{}*", p));
        let tags = self.repo.tag_names(pattern.as_deref())?;
        debug!(
            "found {} tags (pattern: {})",
            tags.len(),
            pattern.unwrap_or("".to_owned())
        );

        let (mut versions, errs) = self.parse_tags(tags);
        errs.into_iter().for_each(|e| match e {
            (tag, semver::SemVerError::ParseError(e)) => {
                warn!("malformed semantic version: {} {}", tag, e)
            }
        });
        versions.sort();

        let current = match versions.last() {
            None => {
                writeln!(
                    self.w.by_ref(),
                    "{} (pattern: {})",
                    "version tag not found".red(),
                    self.prefix.as_deref().unwrap_or("")
                )?;
                return Ok(());
            }
            Some(v) => v,
        };

        let mut bumped = current.clone();
        match self.prompt_bump(&current)? {
            Bump::Major => bumped.increment_major(),
            Bump::Minor => bumped.increment_minor(),
            Bump::Patch => bumped.increment_patch(),
        }

        if !self.confirm_bump(&current, &bumped)? {
            writeln!(self.w.by_ref(), "canceled")?;
            return Ok(());
        }

        let tag_oid = self.create_tag(&bumped)?;
        debug!("create tag(object_id: {})", tag_oid);

        self.push_tag(&bumped)
    }

    fn parse_tags(
        &mut self,
        tags: git2::string_array::StringArray,
    ) -> (Vec<Version>, Vec<(String, SemVerError)>) {
        let (versions, errs): (Vec<_>, Vec<_>) = tags
            .iter()
            .flatten()
            .map(|tag| tag.trim_start_matches(self.prefix.as_deref().unwrap_or("")))
            .map(|tag| Version::parse(tag).map_err(|err| (tag.to_owned(), err)))
            .partition(StdResult::is_ok);
        (
            versions.into_iter().map(StdResult::unwrap).collect(),
            errs.into_iter().map(StdResult::unwrap_err).collect(),
        )
    }

    fn prompt_bump(&mut self, current: &Version) -> Result<Bump> {
        let selections = &["major", "minor", "patch"];
        let select = dialoguer::Select::with_theme(&ColorfulTheme::default())
            .with_prompt(&format!("select bump version (current: {})", current))
            .default(0)
            .items(&selections[..])
            .interact()
            .unwrap();
        let bump = match select {
            0 => Bump::Major,
            1 => Bump::Minor,
            2 => Bump::Patch,
            _ => unreachable!(),
        };
        Ok(bump)
    }

    fn confirm_bump(&mut self, current: &Version, bumped: &Version) -> Result<bool> {
        let branch_name = git2::Branch::wrap(self.repo.head()?)
            .name()?
            .unwrap_or("")
            .to_owned();

        let head = self.repo.head()?.peel_to_commit()?;
        let w = self.w.by_ref();
        writeln!(w, "current HEAD")?;
        writeln!(w, "  branch : {}", branch_name)?;
        writeln!(w, "  id     : {}", head.id())?;
        writeln!(w, "  summary: {}", head.summary().unwrap_or(""))?;
        writeln!(w, "")?;
        dialoguer::Confirmation::new()
            .with_text(&format!(
                "bump version {prefix}{current} -> {prefix}{bumped}",
                prefix = format!("{}", self.prefix.as_deref().unwrap_or(""))
                    .red()
                    .bold(),
                current = format!("{}", current).red().bold(),
                bumped = format!("{}", bumped).red().bold(),
            ))
            .default(false)
            .interact()
            .map_err(anyhow::Error::from)
    }

    fn create_tag(&mut self, version: &Version) -> Result<git2::Oid> {
        let head = self.repo.head()?;
        if !head.is_branch() {
            return Err(anyhow!("HEAD is not branch"));
        }
        let obj = head.peel(git2::ObjectType::Commit)?;
        let signature = self.repo.signature()?;
        self.repo
            .tag(&format!("v{}", version), &obj, &signature, "", false)
            .map_err(anyhow::Error::from)
    }

    fn push_tag(&mut self, version: &Version) -> Result<()> {
        let mut origin = self.repo.find_remote("origin")?;

        let mut push_options = git2::PushOptions::new();
        let mut cb = git2::RemoteCallbacks::new();
        cb.transfer_progress(|_progress| {
            debug!(
                "called progress total_objects: {}",
                _progress.total_objects()
            );
            true
        })
        .push_update_reference(|reference, msg| {
            match msg {
                Some(err_msg) => println!("{}", err_msg.yellow()),
                None => println!("successfully pushed origin/{}", reference),
            }
            Ok(())
        })
        .credentials(|url, username_from_url, allowed_types| {
            debug!(
                "credential cb url:{} username_from_url:{:?} allowed_type {:?}",
                url, username_from_url, allowed_types
            );
            if allowed_types.contains(git2::CredentialType::USER_PASS_PLAINTEXT) {
                let user_name = match username_from_url {
                    Some(u) => Some(Cow::from(u)),
                    None => match self.user_name() {
                        Ok(Some(u)) => Some(u),
                        _ => None,
                    },
                };
                return match git2::Cred::credential_helper(&self.cfg, url, user_name.as_deref()) {
                    Ok(cred) => {
                        debug!("credential helper success");
                        Ok(cred)
                    }
                    Err(err) => {
                        debug!("{}", err);
                        // TODO: cache user credential to avoid prompt every time if user agree.
                        let cred = prompt_userpass()
                            .map_err(|_| git2::Error::from_str("prompt_userpass"))?;
                        git2::Cred::userpass_plaintext(&cred.0, &cred.1)
                    }
                };
            }
            // TODO: currently only USER_PASS_PLAINTEXT called :(
            git2::Cred::ssh_key_from_agent("xxx")
        });

        push_options.remote_callbacks(cb);

        let ref_spec = format!("refs/tags/v{0}:refs/tags/v{0}", version);
        debug!("refspec: {}", ref_spec);

        origin
            .push(&[&ref_spec], Some(&mut push_options))
            .map_err(anyhow::Error::from)
    }

    fn user_name(&self) -> Result<Option<Cow<str>>> {
        for entry in &self.cfg.entries(Some("user*"))? {
            if let Ok(entry) = entry {
                debug!("found {:?} => {:?}", entry.name(), entry.value());
                return Ok(entry.value().map(|v| Cow::Owned(String::from(v))));
            }
        }
        Ok(None)
    }
}

fn prompt_userpass() -> Result<(String, String)> {
    let username = dialoguer::Input::<String>::new()
        .with_prompt("username")
        .interact()?;
    let password = dialoguer::PasswordInput::new()
        .with_prompt("password")
        .interact()?;
    Ok((username, password))
}

`Bumper::bump() がentry pointです。流れとしては、まずgit2::Repository::open_from_env()Repositoryを取得します。 このRepositoryに各処理の起点となるmethodが定義されています。 今回は、tagの一覧がほしいので、Repository::tag_names() を呼び出し、StringArrayを取得します。 &'a StringArrayIteratorを実装しているので、perseしてsemantic versionの一覧に変換します。 ユーザにbumpするversionを選択してもらったあとは、bumpされたversionでtagを作成します。

    fn create_tag(&mut self, version: &Version) -> Result<git2::Oid> {
        let head = self.repo.head()?;
        if !head.is_branch() {
            return Err(anyhow!("HEAD is not branch"));
        }
        let obj = head.peel(git2::ObjectType::Commit)?;
        let signature = self.repo.signature()?;
        self.repo
            .tag(&format!("v{}", version), &obj, &signature, "", false)
            .map_err(anyhow::Error::from)
    }

Repository::tag()が定義されているので、tagを作るのはこれを呼ぶのかなと思い、docをみてみると以下のように定義されています。

pub fn tag(
    &self,
    name: &str,
    target: &Object,
    tagger: &Signature,
    message: &str,
    force: bool
) -> Result<Oid, Error>

ここで、Objectなる聞き慣れない型がでてきました。ここで、git objectで検索すると公式?のChapter 10 Git Internalの記事がでてきました。 どうやら、Gitは内部的に、key-value storeを備えており、valueはblobとして保持しているようです。このblobを特定の型(データモデル)として扱うことをpeelと呼んでいるみたいです。 ということで、cliからgit tag vX.Y.Zと実行したときはHEADが対象になることにならって、Repository::head() でHEADを取得するようにしてみました。

認証がやっかい

localにtagを作成したあとはremoteにpushするだけなのですが、ここがやっかいでした。 まず、Repository::find_remote()Remoteを取得し、Remote:push()を実行します。pushは以下のように定義されています。

pub fn push<Str: AsRef<str> + IntoCString + Clone>(
    &mut self,
    refspecs: &[Str],
    opts: Option<&mut PushOptions>
) -> Result<(), Error>

refspecsについては、refs/tags/vX.Y.Z:refs/tags/vX.Y.Zのように、refs以下をそのまま対応させたらうまくいきました。 次の引数、PushOptionsにcallbackとして認証処理をわたせるようになっています。

pub fn credentials<F>(&mut self, cb: F) -> &mut RemoteCallbacks<'a>
where
    F: FnMut(&str, Option<&str>, CredentialType) -> Result<Cred, Error> + 'a, 

ここでdocumentにあまり情報がなく、困ったのですが、issuecargoのコメントから、callbackの第3引数(allowed_types)に応じて、git2::Credを生成して返せばよさそうだったので、git2::Cred::user_pass_plaintext()を実行したところ、githubに認証してもらえました。

毎回認証のpromptをだすのはさすがに煩わしいので、このあたりは改善したいです。 libgit2にもdocがあります。

まとめ

Rustからgitの処理を安全に(FFIをwrapしてもらった形で)扱える。

Rustでネットワークの速度を測ってみる

会社の回線が遅いなと感じ、具体的な数字を測ってみたくなったので、Rustでtcpのthroughputを計測するcliを作ってみました。 source codeはこちら

install

cargobrewでinstallできます。

brew

$ brew tap ymgyt/netspeed
$ brew install netspeed

cargo

cargo install netspeed

測ってみる

事前にEC2上でnetspeed server runを実行してserverを起動してあります。defaultではこのserverへの接続を試みます。

netspeed  
INFO  Connecting to "netspeed.ymgyt.io:5555"
INFO  Start downstream duration: 3 seconds
INFO  Start upstream duration: 3 seconds
Downstream: 24.00 Mbps
  Upstream: 122.67 Mbps

同時実行数に制限をかけているので、上限をこえて同時に実行するとエラーになります。

while true; do netspeed &; done
ERROR Server decline speed test. Cause: max threads exceeded(100)

測り方としては、tcp接続ができたら、以下のようなおれおれプロトコルでserverにやりたいことを通知して、1MBのread/writeのloopを指定の時間内まわしています。

#[repr(u8)]
#[derive(Debug, Eq, PartialEq)]
pub enum Command {
    Ping = 1,
    RequestDownstream = 2,
    RequestUpstream = 3,
    SendBuffer = 4,
    Complete = 5,
    Ready = 6,
    Decline = 7,
    Close = 100,
}

impl From<Command> for u8 {
    fn from(cmd: Command) -> Self {
        match cmd {
            Command::Ping => 1,
            Command::RequestDownstream => 2,
            Command::RequestUpstream => 3,
            Command::SendBuffer => 4,
            Command::Complete => 5,
            Command::Ready => 6,
            Command::Decline => 7,
            Command::Close => 100,
        }
    }
}

impl TryFrom<u8> for Command {
    type Error = anyhow::Error;
    fn try_from(n: u8) -> Result<Self> {
        match n {
            1 => Ok(Command::Ping),
            2 => Ok(Command::RequestDownstream),
            3 => Ok(Command::RequestUpstream),
            4 => Ok(Command::SendBuffer),
            5 => Ok(Command::Complete),
            6 => Ok(Command::Ready),
            7 => Ok(Command::Decline),
            100 => Ok(Command::Close),
            _ => Err(anyhow!("Invalid number {} for command", n)),
        }
    }
}

https://github.com/ymgyt/netspeed/blob/master/src/command.rs

use crate::Result;
use anyhow::anyhow;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
    convert::{From, TryFrom},
    io::{Read, Write},
    net::{Shutdown, TcpStream},
    time::{self, Duration},
};

pub struct Operator {
    conn: TcpStream,
}

impl Operator {
    pub fn new(conn: TcpStream) -> Self {
        Self { conn }
    }
    pub fn ping_write_then_read(&mut self) -> Result<()> {
        self.write_ping().and(self.read_ping())
    }

    pub fn ping_read_then_write(&mut self) -> Result<()> {
        self.read_ping().and(self.write_ping())
    }

    fn write_ping(&mut self) -> Result<()> {
        self.write(Command::Ping)
    }

    fn read_ping(&mut self) -> Result<()> {
        self.expect(Command::Ping)
    }

    pub fn request_downstream(&mut self, duration: Duration) -> Result<()> {
        self.write(Command::RequestDownstream)
            .and_then(|_| self.write_duration(duration))
            .and_then(|_| self.flush())
    }

    pub fn request_upstream(&mut self, duration: Duration) -> Result<()> {
        self.write(Command::RequestUpstream)
            .and_then(|_| self.write_duration(duration))
            .and_then(|_| self.flush())
    }

    pub fn write_loop(&mut self, timeout: Duration) -> Result<u64> {
        let start = time::Instant::now();
        let mut write_bytes = 0u64;
        let buff = [0u8; crate::BUFFER_SIZE];
        loop {
            if start.elapsed() >= timeout {
                break;
            }
            self.send_buffer(&buff)?;
            write_bytes = write_bytes.saturating_add(crate::BUFFER_SIZE as u64);
        }
        self.write(Command::Complete)?;
        Ok(write_bytes)
    }

    pub fn read_loop(&mut self) -> Result<u64> {
        let mut buff = [0u8; crate::BUFFER_SIZE];
        let mut read_bytes = 0u64;
        loop {
            match self.read()? {
                Command::SendBuffer => {
                    self.receive_buffer(&mut buff)?;
                    read_bytes = read_bytes.saturating_add(crate::BUFFER_SIZE as u64);
                }
                Command::Complete => return Ok(read_bytes),
                _ => return Err(anyhow!("Unexpected command")),
            }
        }
    }

    pub fn send_buffer(&mut self, buff: &[u8]) -> Result<()> {
        self.write(Command::SendBuffer)?;
        Write::by_ref(&mut self.conn).write_all(buff)?;
        self.flush()
    }

    pub fn receive_buffer(&mut self, buff: &mut [u8]) -> Result<()> {
        Read::by_ref(&mut self.conn)
            .read_exact(buff)
            .map_err(anyhow::Error::from)
    }

    pub fn write(&mut self, cmd: Command) -> Result<()> {
        Write::by_ref(&mut self.conn)
            .write_u8(cmd.into())
            .map_err(anyhow::Error::from)
    }

    pub fn read(&mut self) -> Result<Command> {
        Command::try_from(Read::by_ref(&mut self.conn).read_u8()?)
    }

    pub fn write_duration(&mut self, duration: Duration) -> Result<()> {
        Write::by_ref(&mut self.conn)
            .write_u64::<BigEndian>(duration.as_secs())
            .map_err(anyhow::Error::from)
    }

    pub fn read_duration(&mut self) -> Result<Duration> {
        Read::by_ref(&mut self.conn)
            .read_u64::<BigEndian>()
            .map_err(anyhow::Error::from)
            .map(Duration::from_secs)
    }

    pub fn expect(&mut self, expect: Command) -> Result<()> {
        let actual = Command::try_from(Read::by_ref(&mut self.conn).read_u8()?)?;
        if actual != expect {
            Err(anyhow!(
                "Unexpected command. expect: {:?}, actual: {:?}",
                expect,
                actual
            ))
        } else {
            Ok(())
        }
    }

    pub fn write_decline(&mut self, reason: DeclineReason, shutdown: bool) -> Result<()> {
        self.write(Command::Decline)
            .and(self.write_decline_reason(reason))
            .and(self.flush())?;

        if shutdown {
            self.conn
                .shutdown(Shutdown::Both)
                .map_err(anyhow::Error::from)
        } else {
            Ok(())
        }
    }

    fn write_decline_reason(&mut self, reason: DeclineReason) -> Result<()> {
        let v = match reason {
            DeclineReason::MaxThreadsExceed(max_threads) => {
                // reason:32bit | description: 32bit
                let mut v: u64 = 1;
                v <<= 32;
                v += max_threads as u64;
                v
            }
            DeclineReason::Unknown => 0,
        };
        Write::by_ref(&mut self.conn)
            .write_u64::<BigEndian>(v)
            .map_err(anyhow::Error::from)
    }

    pub fn read_decline_reason(&mut self) -> Result<DeclineReason> {
        let v = Read::by_ref(&mut self.conn)
            .read_u64::<BigEndian>()
            .map_err(anyhow::Error::from)?;
        let reason = v >> 32;
        let detail = v & (std::u32::MAX as u64);
        if reason == 1 {
            Ok(DeclineReason::MaxThreadsExceed(detail as u32))
        } else {
            Ok(DeclineReason::Unknown)
        }
    }

    fn flush(&mut self) -> Result<()> {
        Write::by_ref(&mut self.conn)
            .flush()
            .map_err(anyhow::Error::from)
    }
}

localで試す

localで試すには以下のようにします。

terminal1

netspeed server run  
INFO  2020-02-15T13:01:56.892922+00:00 Listening on "0.0.0.0:5555" max threads: 100
INFO  2020-02-15T13:02:04.785769+00:00 Pass concurrent threads check. (0/100)
INFO  2020-02-15T13:02:04.786067+00:00 Handle incoming connection. dispatch worker 127.0.0.1:53013 actives: 1
INFO  2020-02-15T13:02:04.786480+00:00 (Worker:127.0.0.1:53013) => Handle downstream
INFO  2020-02-15T13:02:07.787380+00:00 (Worker:127.0.0.1:53013) => Successfully handle downstream
INFO  2020-02-15T13:02:07.787475+00:00 (Worker:127.0.0.1:53013) => Handle upstream
INFO  2020-02-15T13:02:10.840054+00:00 (Worker:127.0.0.1:53013) => Successfully handle upstream

terminal2

netspeed --addr 127.0.0.1:5555
INFO  Connecting to "127.0.0.1:5555"
INFO  Start downstream duration: 3 seconds
INFO  Start upstream duration: 3 seconds
Downstream: 19.93 Gbps
  Upstream: 20.39 Gbps

RustのcliをHomebrewで公開する

この記事ではRust製のcliをbrew installできるようになるまでの手順について書きます。(CI上で実行できるようにしたいのですが、まずは手でやります) サンプルとして利用するのは、前回の記事で紹介した、MySQLにpingをうつだけのcli, mysqlpingerです。

build

$ cd path/to/mysqlpinger
$ cargo build --release
$ cd target/release
$ tar -czf mysqlpinger-0.3.0-x86_64-apple-darwin.tar.gz mysqlpinger
$ shasum -a 256 mysqlpinger-0.3.0-x86_64-apple-darwin.tar.gz
f253213e27eaec55a9c58029de92c84c4bd11f311f77d273e15053a21ba5684c  mysqlpinger-0.3.0-x86_64-apple-darwin.tar.gz

--releaseでbuildして、tarにします。file名は他のprojectにならって、<binname>-<version>-<target>.tar.gzとしました。 また、あとで利用するので、hash値を取得しておきます。

github release

f:id:yamaguchi7073xtt:20200206202444p:plain
github release

buildしたbinaryを公開するために、githubのreleaseを利用します。新しいreleaseを作成し、さきほど作成したtar fileをuploadします。 releaseが作成されると、Assetsにリンクが生成されます。

Homebrew Formulaの作成

homebrew-mysqlpinger という名前のrepositoryをgithub上に作成して、cloneしてきます。

$ cd homebrew-mysqlpinger
$ exa -T
.
└── Formula
   └── mysqlpinger.rb

$ bat Formula/mysqlpinger.rb -p
class Mysqlpinger < Formula
  desc "cli mysql utility written in Rust"
  homepage "https://github.com/ymgyt/mysqlpinger"
  url "https://github.com/ymgyt/mysqlpinger/releases/download/v0.3.0/mysqlpinger-0.3.0-x86_64-apple-darwin.tar.gz"
  sha256 "f253213e27eaec55a9c58029de92c84c4bd11f311f77d273e15053a21ba5684c"
  version "0.3.0"

  def install
    bin.install "mysqlpinger"
  end
end

上記のようにFormula/mysqlpinger.rbを作成して、classを定義します。 urlには、github releaseのasset fileのlinkを指定します。 sha256にはtar fileのhash値を指定します。 (class名をMySQLPingerのようにするとエラーになりました)

ここまでを作成して、pushします。

install

$ brew tap ymgyt/mysqlpinger
$ brew install mysqlpinger
$ mysqlpinger --version
mysqlpinger 0.3.0

無事、installすることができました。🎉 github releaseの作成、fileのupload, formula repositoryの更新までをコマンドでできたらこの作業をci上でおこなえそうです。 Rust製でよさそうなツールを探して、なければ作ってみようと思います。

参考にさせていただいたブログ

https://federicoterzi.com/blog/how-to-publish-your-rust-project-on-homebrew/

RustでMySQLにpingをうつ

local環境でdocker-compose等を利用してDBを立ち上げた際に、DBの"ready"を待ちたいことがありました。 最初は、tcp接続でよしとしていたのですが、やはりprotocol的な"ready"が必要だったので、pingをうつ必要がありました。 ということで、RustでMySQLにpingを打ち続けるCLIを作ってみました。source code

flagは以下のような感じです。

$ mysqlpinger --help
mysqlpinger 0.2.1
Ping to mysql server

USAGE:
    mysqlpinger [FLAGS] [OPTIONS] [DBNAME]

ARGS:
    <DBNAME>    Database name [env: MYSQL_DB_NAME=]  [default: sys]

FLAGS:
    -s, --silent     Running with no logging
    -v, --verbose    Verbose
        --forever    Retry without limit
        --help       Prints help information
    -V, --version    Prints version information

OPTIONS:
    -h, --host <HOST>            MySQL server hostname [env: MYSQL_HOST=]  [default: 127.0.0.1]
    -p, --port <PORT>            MySQL server port [env: MYSQL_PORT=]  [default: 3306]
    -u, --user <USER>            User for authentication [env: MYSQL_USER=]  [default: root]
    -P, --pass <PASS>            Password for authentication [env: MYSQL_PASSWORD=]
    -m, --max-retry <COUNT>      Max retry count [default: 9]
    -i, --interval <DURATION>    Retry ping interval [default: 1s]

Example:
    # Basic
    mysqlpinger --pass=root --port=30303 <db_name>

    # Docker
    docker run --rm -t --network=<network> ymgyt/mysqlpinger:latest \
       --user=user --pass=secret --host=<container_name> [--forever|--max-retry=20]

接続のための情報とどれくらいretryするかを指定して、pingが通るまでblockします。

$ docker run --rm -t --network=network  ymgyt/mysqlpinger:latest --pass=secret --host=db --forever
INFO ping -> addr:db:3306 user:root db:sys
INFO 1/♾  Connection refused (os error 111)
INFO 2/♾  Connection refused (os error 111)
// ...
INFO 30/♾  Connection refused (os error 111)
INFO 31/♾  Connection refused (os error 111)
INFO OK (elapsed 31.152sec)

Connection処理

[dependencies]
lazy_static = "1.4.0"
mysql = "17.0.0"
parse_duration = "2.0"
colored = "1.9"
log = "0.4"
env_logger = "0.6"
console = "0.9.2"
ctrlc = "3.1.3"

[dependencies.clap]
git = "https://github.com/clap-rs/clap"
branch = "master"
use clap::ArgMatches;
use log::{debug, info};
use mysql::{Conn, Opts, OptsBuilder};
use parse_duration;
use std::{
    borrow::Cow,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    thread,
    time::Duration,
};

type BoxError = Box<dyn std::error::Error>;

pub struct MySQLPinger {
    opts: Arc<Opts>,
    interval: Duration,
    forever: bool,
    max_retry: u64,
    canceled: AtomicBool,
}

impl MySQLPinger {
    pub fn from_arg(m: &ArgMatches) -> Result<Self, BoxError> {
        let interval = parse_duration::parse(m.value_of("interval").unwrap())?;
        // we need OptsBuilder type first, then calling building methods
        let mut builder = OptsBuilder::default();
        builder
            .ip_or_hostname(m.value_of("host"))
            .tcp_port(
                m.value_of("port")
                    .unwrap()
                    .parse::<u16>()
                    .map_err(|e| format!("invalid port {}", e))?,
            )
            .user(m.value_of("user"))
            .pass(m.value_of("pass"))
            .prefer_socket(false)
            .db_name(m.value_of("dbname"))
            .tcp_connect_timeout(Some(interval));

        Ok(Self {
            opts: Arc::new(builder.into()),
            interval,
            forever: m.is_present("forever"),
            max_retry: m.value_of("max_retry").unwrap().parse()?,
            canceled: AtomicBool::new(false),
        })
    }

    pub fn stop(&self) {
        debug!("stop called");
        self.canceled.store(true, Ordering::Relaxed)
    }

    pub fn ping(&self) -> Result<(), BoxError> {
        info!(
            "ping -> addr:{host}:{port} user:{user} db:{db}",
            host = self.opts.get_ip_or_hostname().unwrap_or(""),
            port = self.opts.get_tcp_port(),
            user = self.opts.get_user().unwrap_or(""),
            db = self.opts.get_db_name().unwrap_or(""),
        );
        debug!(
            "interval:{interval:.1}sec attempt:{attempt}",
            interval = self.interval.as_secs_f64(),
            attempt = self.max_attempt_symbol(),
        );

        let mut attempt = 1;
        let max_attempt = self.max_retry + 1;
        loop {
            if !self.forever && attempt > max_attempt {
                return Err("Max retry count exceeded".into());
            }
            if self.canceled.load(Ordering::Relaxed) {
                return Err("Canceled".into());
            }

            use mysql::DriverError;
            use mysql::Error::*;
            let opts = Arc::clone(&self.opts);
            match Conn::new(Opts::clone(&opts)) {
                Ok(mut conn) => {
                    if conn.ping() {
                        return Ok(());
                    }
                }
                Err(DriverError(DriverError::CouldNotConnect(err))) => {
                    if let Some(err) = err {
                        let (_, description, _) = err;
                        info!("{}/{} {}", attempt, self.max_attempt_symbol(), description);
                    }
                }
                Err(DriverError(DriverError::ConnectTimeout)) => {
                    info!(
                        "{}/{} {}",
                        attempt,
                        self.max_attempt_symbol(),
                        "Connection timeout"
                    );
                }
                Err(err) => return Err(Box::new(err)),
            }

            thread::sleep(self.interval);
            attempt = attempt.wrapping_add(1);
        }
    }

    fn max_attempt_symbol(&self) -> Cow<'static, str> {
        if self.forever {
            "♾ ".into()
        } else {
            (self.max_retry + 1).to_string().into()
        }
    }
}

Connectionをはるために、mysql crateを利用しました。 issueでもあがっていたのですが

OptsBuilder::new().tcp_port().user()

のように、OptsBuilderを生成してそのまま、methodを呼ぶと、Optsに変換できずにハマってしまい、以下のようにしました。

        let mut builder = OptsBuilder::default();
        builder
            .ip_or_hostname(m.value_of("host"))    
            // ...
        Ok(Self {
            opts: Arc::new(builder.into()),
           // ...
        }

Dockerfile

task runner系のツールに書いて、開発者のlocal環境に依存しないようにするためにdocker imageにしました。

FROM rust:1.41.0 as builder

WORKDIR /usr/src/project

COPY . .

RUN cargo install --path .

ENTRYPOINT ["mysqlpinger"]

この書き方だと、image sizeがかなり大きくなってしまう(1.65GB)ので、multi stageで、buildとruntimeをわけられるにしたいです。

Go

最初はGoで書いていました。context.Contextのおかげで、timeout系の処理が非常に書きやすいと思いました。

package main

import (
    "context"
    "database/sql"
    "flag"
    "fmt"
    "os"
    "strings"
    "time"

    "github.com/go-sql-driver/mysql"
)

func main() {
    host := flag.String("host", "localhost", "host")
    port := flag.String("port", "3306", "port")
    user := flag.String("user", "root", "user")
    pass := flag.String("pass", "root", "pass")
    name := flag.String("name", "knight_db", "database name")
    rawTimeout := flag.String("timeout", "60s", "connection wait timeout")
    checkSlave := flag.Bool("check-slave", false, "check slave status")

    flag.Parse()
    start := time.Now()

    cfg := mysql.Config{
        User:                 *user,
        Passwd:               *pass,
        Net:                  "tcp",
        Addr:                 *host + ":" + strings.TrimLeft(*port, ":"),
        DBName:               *name,
        AllowNativePasswords: true,
    }

    db, err := sql.Open("mysql", cfg.FormatDSN())
    exitIfErr(err, 1)

    timeout, err := time.ParseDuration(*rawTimeout)
    exitIfErr(err, 1)
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    // connectionがはれない場合、以下のエラーが出続けてしまうので静かにしてもらう
    // [mysql] 2020/01/31 12:55:47 packets.go:36: unexpected EOF
    mysql.SetLogger(&NopLogger{})
    fmt.Printf("connecting to mysql(dsn: %s)\n", cfg.FormatDSN())
    err = waitReady(ctx, db)
    if err == nil {
        fmt.Printf("successfully connected to mysql(elapsed: %s)\n", time.Since(start))
    } else {
        fmt.Fprintf(os.Stderr, "\n%s\n", err.Error())
        os.Exit(2)
    }

    if *checkSlave {
        err := checkSlaveStatus(db)
        exitIfErr(err, 3)
    }
}

func waitReady(ctx context.Context, db *sql.DB) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        err := db.Ping()
        if err == nil {
            return nil
        }
        fmt.Printf(".")
        time.Sleep(time.Second)
    }
}

func checkSlaveStatus(db *sql.DB) error {
    fmt.Println("checking slave status...")
    rows, err := db.Query("SHOW SLAVE STATUS")
    if err != nil {
        return err
    }
    columns, err := rows.Columns()
    if err != nil {
        return err
    }

    values := make([]interface{}, len(columns))
    for rows.Next() {
        for i := 0; i < len(columns); i++ {
            var s sql.NullString
            values[i] = &s
        }
        err := rows.Scan(values...)
        if err != nil {
            return err
        }
    }

    ioRunning, sqlRunning := false, false
    for i, column := range columns {
        if column == "Slave_IO_Running" {
            ioRunning = values[i].(*sql.NullString).String == "Yes"
        }
        if column == "Slave_SQL_Running" {
            sqlRunning = values[i].(*sql.NullString).String == "Yes"
        }
    }

    fmt.Printf("Slave_IO_Running: %v SLave_SQL_Running: %v\n", ioRunning, sqlRunning)

    if !ioRunning || !sqlRunning {
        return fmt.Errorf("slave thread does not work :(")
    }
    return nil
}

func exitIfErr(err error, code int) {
    if err != nil {
        fmt.Fprintln(os.Stderr, err.Error())
        os.Exit(code)
    }
}

type NopLogger struct{}

func (l *NopLogger) Print(_ ...interface{}) {}

RustでRui Ueyama先生の低レイヤを知りたい人のためのCコンパイラ作成入門をやってみる1(環境構築から四則演算まで)

compilerbookこと低レイヤを知りたい人のためのCコンパイラ作成入門をRustでやっていきます。 本記事では、環境構築から四則演算のtestを通すところまでおこないます。compilerbookのステップ5: 四則演算のできる言語の作成までです。

概要

概要としては、計算の対象となる文字列(2*(3+4))を引数にとるCLIをRustで作成し、アセンブリを出力します。そのアセンブリをcompilerbookが提供してくださっているdocker上のgccでcompileして機械語を生成する流れとなります。 生成されたプログラムは計算結果を終了ステータスとして返します。

# 入力からアセンブリを生成するCLI
cargo run -- '2*(3+4)'
    Finished dev [unoptimized + debuginfo] target(s) in 0.01s
     Running `target/debug/r9cc '2*(3+4)'`
.intel_syntax noprefix
.global main
main:
  push 2
  push 3
  push 4
  pop rdi
  pop rax
  add rax, rdi
  push rax
  pop rdi
  pop rax
  imul rax, rdi
  push rax
  pop rax
  ret

# docker上で利用するためにcross compile
cross build --target x86_64-unknown-linux-musl

# build済のdocker上でtestを実行
docker run -it -v $(pwd):/ws -w /ws compilerbook ./test.sh

compilerbookにそって、(文字列->Token列)、(Token列 -> AST)、(AST -> アセンブリ)の3つの変換処理を行います。 main.rs

use r9cc::{gen, parse, tokenize, Error};
use std::{env, io, process};

fn main() {
    let result = env::args()
        .skip(1)
        .next()
        .ok_or(Error::InputRequired)
        .and_then(|input| tokenize(&input))
        .and_then(|tokens| parse(tokens))
        .and_then(|ast| gen(&mut io::stdout(), &ast));

    if let Err(e) = result {
        match e {
            Error::Lexer(e) => {
                eprintln!("{}\n{}", env::args().skip(1).next().unwrap(), e);
            }
            _ => eprintln!("{}", e),
        }

        process::exit(1);
    }
}

環境構築

環境構築といっても、必要なツールが揃っているDockerfileを準備していただいているので、Rust側だけです。 compilerbookでは9ccという名前で実装していくので、r9ccとしました。repositoryはこちらです

# clone
git clone https://github.com/ymgyt/r9cc

# 必要なツールをinstall
cargo install cargo-make
cargo install cross

# version
rustc -V
rustc 1.40.0 (73528e339 2019-12-16)

cross -V 
cross 0.1.16
cargo 1.40.0 (bc8e4c8be 2019-11-22)

# docker imageをbuild
cargo make image

# docker containerとinteractiveにやりとりしたい場合
# cargo make login

# r9ccをbuild
cargo make build

# testを実行
cargo make my_test

tokenize

環境もととのったので、処理の流れをおっていきます。まずは、入力文字列をtokenのVecに変換するlex部分からです。 この部分は、実践Rust入門の第9章パーサを作るを参考にさせていただきました。 https://github.com/ymgyt/r9cc/blob/7cad51b06d1ba4c1c73d5a01213128853bd115ff/src/lex/token.rs#L177

tokenは以下のように定義しました。

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Loc(pub usize, pub usize);

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Annot<T> {
    pub value: T,
    pub loc: Loc,
}

impl<T> Annot<T> {
    fn new(value: T, loc: Loc) -> Self {
        Self { value, loc }
    }
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum TokenKind {
    Number(u64), // [0-9][0-9]*
    Plus,        // '+'
    Minus,       // '-'
    Asterisk,    // '*'
    Slash,       // '/'
    LParen,      // '('
    RParen,      // ')'
    Eof,         // sentinel
}

impl TokenKind {
    pub(crate) fn is_number(&self) -> bool {
        match *self {
            TokenKind::Number(_) => true,
            _ => false,
        }
    }
}

pub type Token = Annot<TokenKind>;

tokenに位置に関する情報をもたせるために、Annotでwrapしています。これは実践Rust入門でおこなわれていた実装方法で、Genericsの使い方として非常に参考になりました。 Locの情報はエラー時に以下のような表示をだすために利用します。

./target/debug/r9cc '1 + 2 - aaa'  
1 + 2 - aaa
        ^ invalid char 'a'

tokenの生成はtokenize関数で行います。

#[derive(Debug)]
struct Input<'a> {
    input: &'a [u8],
    pos: Cell<usize>,
}

impl<'a> Input<'a> {
    fn new(s: &'a str) -> Self {
        Self {
            input: s.as_bytes(),
            pos: Cell::new(0),
        }
    }
    fn consume_byte(&self, want: u8) -> Result<usize> {
        self.peek().and_then(|got: u8| {
            if got != want {
                let pos = self.pos();
                Err(Error::invalid_char(got as char, Loc(pos, pos + 1)))
            } else {
                Ok(self.pos_then_inc())
            }
        })
    }
    fn consume_numbers(&self) -> Result<(usize, u64)> {
        let start = self.pos();
        self.consume(|b| b"0123456789".contains(&b));
        let n = std::str::from_utf8(&self.input[start..self.pos()])
            .unwrap()
            .parse()
            .unwrap();
        Ok((start, n))
    }
    fn consume_spaces(&self) {
        self.consume(|b| b" \n\t".contains(&b))
    }
    fn consume(&self, mut f: impl FnMut(u8) -> bool) {
        while let Ok(b) = self.peek() {
            if f(b) {
                self.inc();
                continue;
            }
            break;
        }
    }
    fn peek(&self) -> Result<u8> {
        self.input
            .get(self.pos())
            .map(|&b| b)
            .ok_or_else(|| self.eof())
    }
    fn pos(&self) -> usize {
        self.pos.get()
    }
    fn inc(&self) {
        self.pos.set(self.pos() + 1);
    }
    fn pos_then_inc(&self) -> usize {
        let pos = self.pos();
        self.inc();
        pos
    }
    fn eof(&self) -> Error {
        let pos = self.pos();
        Error::eof(Loc(pos, pos))
    }
}

pub fn tokenize(input: &str) -> StdResult<Stream, crate::Error> {
    let mut tokens = Vec::new();
    let input = Input::new(input);

    macro_rules! push {
        ($lexer:expr) => {{
            let tk = $lexer?;
            tokens.push(tk);
        }};
    }
    loop {
        match input.peek() {
            Err(e) => match e.value {
                ErrorKind::Eof => {
                    tokens.push(Token::eof(Loc(input.pos(), input.pos())));
                    return Ok(tokens);
                }
                _ => return Err(e.into()),
            },
            Ok(b) => match b {
                b'0'..=b'9' => push!(lex_number(&input)),
                b'+' => push!(lex_plus(&input)),
                b'-' => push!(lex_minus(&input)),
                b'*' => push!(lex_asterisk(&input)),
                b'/' => push!(lex_slash(&input)),
                b'(' => push!(lex_lparen(&input)),
                b')' => push!(lex_rparen(&input)),
                _ if (b as char).is_ascii_whitespace() => input.consume_spaces(),
                _ => {
                    return Err(
                        Error::invalid_char(b as char, Loc(input.pos(), input.pos() + 1)).into(),
                    )
                }
            },
        }
    }
}

fn lex_number(input: &Input) -> Result<Token> {
    input
        .consume_numbers()
        .map(|(pos, n)| Token::number(n, Loc(pos, input.pos())))
}

fn lex_plus(input: &Input) -> Result<Token> {
    input
        .consume_byte(b'+')
        .map(|pos| Token::plus(Loc(pos, pos + 1)))
}

fn lex_minus(input: &Input) -> Result<Token> {
    input
        .consume_byte(b'-')
        .map(|pos| Token::minus(Loc(pos, pos + 1)))
}

fn lex_asterisk(input: &Input) -> Result<Token> {
    input
        .consume_byte(b'*')
        .map(|pos| Token::asterisk(Loc(pos, pos + 1)))
}

fn lex_slash(input: &Input) -> Result<Token> {
    input
        .consume_byte(b'/')
        .map(|pos| Token::slash(Loc(pos, pos + 1)))
}

fn lex_lparen(input: &Input) -> Result<Token> {
    input
        .consume_byte(b'(')
        .map(|pos| Token::lparen(Loc(pos, pos + 1)))
}

fn lex_rparen(input: &Input) -> Result<Token> {
    input
        .consume_byte(b')')
        .map(|pos| Token::rparen(Loc(pos, pos + 1)))
}

演算子ならそのままtokenに、空白は無視して、数字なら途切れるまで読んでから変換を試みます。 入力文字列と何byteまで読んだかを一緒に引き回したかったのでInput型でwrapしています。入力文字列は&strで可変参照がとれないので、posをCellでwrapしています。 (これがCellの正しい使い方なのかはあまり自信がないです。)

ASTへの変換

続いて、token列をTree型のデータ構造に変換します。ASTを表すNodeは以下のように定義しました。

#[derive(Debug, PartialEq)]
pub enum Kind {
    Add,
    Sub,
    Mul,
    Div,
    Number(u64),
}

pub type Link = Option<Box<Node>>;

#[derive(Debug, PartialEq)]
pub struct Node {
    pub kind: Kind,
    pub lhs: Link,
    pub rhs: Link,
}

impl Node {
    pub fn new(kind: Kind, lhs: Link, rhs: Link) -> Node {
        Self { kind, lhs, rhs }
    }
    pub fn link(node: Node) -> Link {
        Some(Box::new(node))
    }
    pub fn number(n: u64) -> Node {
        Node::new(Kind::Number(n), None, None)
    }
}

CのpointerはOption<Box<T>>で代用しました。(もっといい方法があるかもしれません。) 今回は利用しませんでしたが、RustのpointerのひとつであるRcについては@qnighyさんのこちらの記事が大変参考になりました。

四則演算をparseするための文法規則は以下のように定義されています。

expr    = mul ("+" mul | "-" mul)*
mul     = primary ("*" primary | "/" primary)*
primary = num | "(" expr ")"

このexpr, mul, primaryそれぞれに対応する関数を定義します。

type Result<T> = StdResult<T, Error>;

pub fn parse(stream: Stream) -> StdResult<Node, crate::Error> {
    let mut tokens = stream.into_iter().peekable();
    expr(&mut tokens).map_err(|e| crate::Error::from(e))
}

fn expr<Tokens>(tokens: &mut Peekable<Tokens>) -> Result<Node>
where
    Tokens: Iterator<Item = Token>,
{
    let mut node = mul(tokens)?;
    loop {
        if consume(tokens, TokenKind::Plus)? {
            node = Node::new(Kind::Add, Node::link(node), Node::link(mul(tokens)?));
        } else if consume(tokens, TokenKind::Minus)? {
            node = Node::new(Kind::Sub, Node::link(node), Node::link(mul(tokens)?));
        } else {
            return Ok(node);
        }
    }
}

fn mul<Tokens>(tokens: &mut Peekable<Tokens>) -> Result<Node>
where
    Tokens: Iterator<Item = Token>,
{
    let mut node = primary(tokens)?;
    loop {
        if consume(tokens, TokenKind::Asterisk)? {
            node = Node::new(Kind::Mul, Node::link(node), Node::link(primary(tokens)?));
        } else if consume(tokens, TokenKind::Slash)? {
            node = Node::new(Kind::Div, Node::link(node), Node::link(primary(tokens)?));
        } else {
            return Ok(node);
        }
    }
}

fn primary<Tokens>(tokens: &mut Peekable<Tokens>) -> Result<Node>
where
    Tokens: Iterator<Item = Token>,
{
    let node = if consume(tokens, TokenKind::LParen)? {
        let node = expr(tokens)?;
        expect(tokens, TokenKind::RParen)?;
        node
    } else {
        Node::number(expect_number(tokens)?)
    };
    Ok(node)
}

fn consume<Tokens>(tokens: &mut Peekable<Tokens>, kind: TokenKind) -> Result<bool>
where
    Tokens: Iterator<Item = Token>,
{
    let peek = tokens.peek();
    if peek.is_none() {
        return Ok(false);
    }
    let peek = peek.unwrap();
    if peek.is_kind(kind) {
        tokens.next();
        Ok(true)
    } else {
        Ok(false)
    }
}

fn expect<Tokens>(tokens: &mut Peekable<Tokens>, kind: TokenKind) -> Result<()>
where
    Tokens: Iterator<Item = Token>,
{
    let peek = tokens.peek();
    if peek.is_none() {
        return Err(Error::Eof);
    }
    let peek = peek.unwrap();
    if peek.is_kind(kind) {
        tokens.next();
        Ok(())
    } else {
        Err(Error::UnexpectedToken(peek.clone()))
    }
}

fn expect_number<Tokens>(tokens: &mut Peekable<Tokens>) -> Result<u64>
where
    Tokens: Iterator<Item = Token>,
{
    let peek = tokens.peek();
    if peek.is_none() {
        return Err(Error::Eof);
    }
    let peek = peek.unwrap();
    match peek.value {
        TokenKind::Number(n) => {
            tokens.next();
            Ok(n)
        }
        _ => Err(Error::UnexpectedToken(peek.clone())),
    }
}

優先度の高い演算子ほど先に処理されて、木の深い位置におかれるようになっています。(文法規則をそのままコードにして動くのは感動します。)

アセンブリの生成

ASTのparse処理によって、計算の優先度は木の深さで表現されているので、一番深いところから、演算子に対応する命令を実行して計算結果をスタックに保持するようなアセンブリを出力します。

pub fn gen<W: Write>(w: &mut W, node: &Node) -> Result<(), crate::Error> {
    pre_gen(w)
        .and_then(|_| main_gen(w, node))
        .and_then(|_| post_gen(w))
        .map_err(|e| crate::Error::from(e))
}

fn pre_gen<W: Write>(w: &mut W) -> io::Result<()> {
    write!(
        w,
        ".intel_syntax noprefix\n\
         .global main\n\
         main:\n",
    )
}

fn main_gen<W: Write>(w: &mut W, node: &Node) -> io::Result<()> {
    if let NodeKind::Number(n) = node.kind {
        write!(w, "  push {}\n", n)?;
        return Ok(());
    }

    main_gen(w, &node.lhs.as_ref().unwrap())?;
    main_gen(w, &node.rhs.as_ref().unwrap())?;

    write!(w, "  pop rdi\n")?;
    write!(w, "  pop rax\n")?;

    match node.kind {
        NodeKind::Add => write!(w, "  add rax, rdi\n")?,
        NodeKind::Sub => write!(w, "  sub rax, rdi\n")?,
        NodeKind::Mul => write!(w, "  imul rax, rdi\n")?,
        NodeKind::Div => {
            write!(w, "  cqo\n")?;
            write!(w, "  idiv rdi\n")?;
        }
        _ => unreachable!(),
    }

    write!(w, "  push rax\n")?;

    Ok(())
}

fn post_gen<W: Write>(w: &mut W) -> io::Result<()> {
    write!(w, "{}{}", "  pop rax\n", "  ret\n")
}

(divだけx86-64の仕様が特殊で追加の処理が必要らしいです) 数値ならstackにpushして、演算子ならpopを2回おこなってから計算して結果をstackに書き戻すシンプルな処理です。

test

ここまでで、入力文字列からアセンブリを出力できるようになりました。

./target/debug/r9cc '6/2*(3+4)'   
.intel_syntax noprefix
.global main
main:
  push 6
  push 2
  pop rdi
  pop rax
  cqo
  idiv rdi
  push rax
  push 3
  push 4
  pop rdi
  pop rax
  add rax, rdi
  push rax
  pop rdi
  pop rax
  imul rax, rdi
  push rax
  pop rax
  ret

アセンブリから機械語への変換はdocker上でおこない、testを走らせます。 test用のscriptはcompilerbookのものを実行pathを修正して利用しました。

#!/bin/bash

CMD=./target/x86_64-unknown-linux-musl/debug/r9cc
TARGET=./target/tmp

mkdir -p "${TARGET}"

try() {
  expected="$1"
  input="$2"

  ${CMD} "$input" > "${TARGET}/tmp.s"
  gcc -o "${TARGET}/tmp" "${TARGET}/tmp.s"
  "${TARGET}/tmp"
  actual="$?"

  if [ "$actual" = "$expected" ]; then
    echo "$input => $actual"
  else
    echo "$input => $expected expected, but got $actual"
    exit 1
  fi
}

try 0 0
try 100 100
try 2 '1+1'
try 21 '3*(9-2)'
try 14 '(3+3)+2*(5-1)'

echo OK

dockerのbase imageはubuntuなので、crossを利用してcross compileを実行してから、docker上で上記のtest.shを実行します。

[loc rs/r9cc] cargo make build 
[cargo-make] INFO - cargo make 0.25.0
[cargo-make] INFO - Project: r9cc
[cargo-make] INFO - Build File: Makefile.toml
[cargo-make] INFO - Task: build
[cargo-make] INFO - Profile: development
[cargo-make] INFO - Running Task: init
[cargo-make] INFO - Running Task: build
[cargo-make] INFO - Execute Command: "cross" "build" "--target" "x86_64-unknown-linux-musl"
   Compiling r9cc v0.1.0 (/project)
    Finished dev [unoptimized + debuginfo] target(s) in 11.71s
[cargo-make] INFO - Running Task: end
[cargo-make] INFO - Build Done in 13 seconds.
[loc rs/r9cc] cargo make my_test   
[cargo-make] INFO - cargo make 0.25.0
[cargo-make] INFO - Project: r9cc
[cargo-make] INFO - Build File: Makefile.toml
[cargo-make] INFO - Task: my_test
[cargo-make] INFO - Profile: development
[cargo-make] INFO - Running Task: init
[cargo-make] INFO - Running Task: my_test
0 => 0
100 => 100
1+1 => 2
3*(9-2) => 21
(3+3)+2*(5-1) => 14
OK
[cargo-make] INFO - Running Task: end
[cargo-make] INFO - Build Done in 2 seconds.

まとめ

環境構築から四則演算までを行いました。次回はステップ6 単項プラスと単項マイナスからを予定しています。 このようなすばらしいドキュメントを無料で公開してくださっているRui Ueyama先生、ありがとうございます。(なんらかの形で販売されることがありましたら必ず購入します)

参考

  • compilerbook 低レイヤを知りたい人のためのCコンパイラ作成入門

Rui Ueyama ruiu@cs.stanford.edu

https://www.sigbus.info/compilerbook

  • 実践Rust入門

κeen , 河野 達也 , 小松 礼人

https://www.amazon.co.jp/dp/B07QVQ7RDG

Kubernetes Meetup Tokyo #23にいってきました

2019年9月27日に開催されたKubernetes Meetup Tokyo #23にブログ枠で参加させていただいたので、その模様について書いていきます。

会場

会場は渋谷ストリームの隣に誕生した渋谷スクランブルスクエアです。ビルの正式オープンが11月からとのことで絶賛内装工事中でした。 渋谷駅直結でとても便利な立地です。スポンサーはCyberAgent社です。

f:id:yamaguchi7073xtt:20190928172718j:plainf:id:yamaguchi7073xtt:20190928172803j:plainf:id:yamaguchi7073xtt:20190928172807j:plain
Session後の懇談会の様子

Session

ゼロから始めるKubernetes Controller

技術書典7で買わせていただいた、実践入門Kubernetes カスタムコントローラへの道の著者であられるバルゴさんによるKubernetes Controllerについての発表。

f:id:yamaguchi7073xtt:20190928180008j:plain speakerdeck.com

Controllerの概要から内部の実装についてまで説明されている100P超えの力作スライドです。 ReplicaSetをapplyしてからデリバリされるまでの処理を該当ソースへの参照箇所まで添えて説明してくれており開始10分でこれてよかったと思えました。

kubebuilder/controller-runtime入門 with v2 updates

kfyharukzさんによるkubebuilder v2の紹介/デモ。

kubernetesのAPIを拡張するためのSDK kubebuilderについての発表です。

www.slideshare.net

Kubernetesのbuiltin Resouceの理解もまだあやしい自分にとっては発展的な内容でした。CKA合格できたらCustom Resourceにも挑戦したいです。

Kubernetes 1.16: SIG-API Machineryの変更内容

Introduction of Operator Frameworkが行われる予定でしたが、発表者の方が体調不良とのことで内容が変更になりました。

LadicleさんによるKubernetes 1.16: SIG-API Machineryの変更内容qiita.com そもそもSIGという単語がわかっていませんでした。調べてみたところKubernetes Projectの開発単位という感じでしょうか。 興味があるSIGから追いかけてみるととっかかりとしてはよいというアドバイスもいただきました。

github.com

zlab社がQiitaでKubernetesやGo関連で参考になる記事をたくさんあげてくれているのでフォローしていこうと思いました。

qiita.com

Kubernetes 1.16: SIG-CLI の変更内容

同じくzlab社すぱぶらさんによるKubernetes 1.16: SIG-CLI の変更内容です。 qiita.com

kubectlの実践的な解説で自分が打ったことがないコマンドやオプションばかりでした。kubectl debug は利用できたらとても便利そうなので待ち遠しいです。

LT

ClusterAPI v1alpha1 → v1alpha2

r_takaishiさんによるClusterAPIについて。 発表資料のリンクが見つけられず。 ClusterAPIについてはまったくわかっておらず。そもそもKubernetesってCluster(特にcontroll plane)は作成されている前提だと考えていたので気になるところです。 技術書典7では買えていなかったはじめるCluster API読んで出直します。

f:id:yamaguchi7073xtt:20190928193713j:plain

自動化はshell-operator とともに。

nwiizoさんさんによるshell(bash)とkubernetesについて。 そもそも、operatorについてのわかっていないので、なんかbashでもkubernetesの処理にはさめるんだなくらいの理解しかできませんでした。 ここは議論のあるところだと思いますが、自分は以下の点からあまりbashを本番関連の処理に組み込みたくないと考えているのですがどうなんでしょうか。

  • network処理には必ずtimeout設定したい
  • error handling
  • logging(structured, leveling,...)
  • signal handling(gracefully shutdown, resource cleanup等)
  • test書きたい
  • 依存を明示

(bashでできないことはないと思うのですが上記のことやろうとすると肥大化するかscriptの手軽さが結局失われる)

自作 Controller による Secret の配布と収集 - unblee

unbleeさんによる speakerdeck.com

wantedly社でのKubernetes運用上の課題をControllerを作成して解決されたお話でした。 1-MicroService 1-Namespaceで運用されているそうです、実運用されている方々のNamespaceの切り方についてはもっとお話を伺ってみたいと思いました。

懇談会

SessionとLTの間に30分程度の懇談会の時間が設けられています。(飲食物の提供はCyberAgent社!) たまたま技術書典7で買って、当日も読んでいたカスタムコントローラへの道の著者のバルゴさんが発表されていたので、本買いましたとご挨拶させていただきました。またCKA、CKADをお持ちとのことで、CKAのアドバイスも聞けました。

qiita.com

Ladicleさんの発表の際に用いられていたQiitaのiconどこかでみたことあるなー思っていたのですが、Software Design 2017年9月号の特集 Web技術【超】入門いま一度振り返るWebのしくみと開発方法でweb serverの実装をgoでやられている方であることを思い出しました。 go-bindataで静的アセットファイルをgoのバイナリーに組み込むやり方をここではじめて知りました。

次回

次回は10月24日で、Kubernetes上で動かすアプリケーション開発のデバックとテストについてだそうです。 こちらも楽しみですね。

まとめ

Kubernetes Meetup Tokyoには初参加で、自分のKubernetesについての理解が、GKEにslack botをdeployしてみる程度*1でしたがとても勉強になり参加してよかったです。

せっかくなのでゼロから始めるKubernetes Controllerをおってみる

ゼロから始めるKubernetes Controllerで、ReplicaSetをapplyした際のControllerの挙動が該当コードのリンクつきで解説されているので、追えるところまでおってみました。kubernetesのversionは1.16です

kubectl apply

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_18.jpg?13730928 https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=19

kubectlでReplicaSetをapplyして、api-serverで処理されている前提です。

ReplicaSet Controllerの起動

slideでは、ReplicaSetControllerReplicaSetが生成されたことを検知したところから解説されていますが、起動処理を簡単におってみました。

ReplicaSetController自体は、kube-controller-manager binaryに含まれているので、起動処理はkube-controller-managerコマンドから始まります。

// Run runs the KubeControllerManagerOptions.  This should never exit.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
  // ...
  if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
            klog.Fatalf("error starting controllers: %v", err)
    }
  // ...
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/cmd/kube-controller-manager/app/controllermanager.go#L234

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
  controllers := map[string]InitFunc{}
  // ...
  controllers["replicaset"] = startReplicaSetController
  // ...
  return controllers
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/cmd/kube-controller-manager/app/controllermanager.go#L386

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
   // ...
  for controllerName, initFn := range controllers {
        // ...
        debugHandler, started, err := initFn(ctx)
        // ...
  }

  return nil

kubernetes/controllermanager.go at release-1.16 · kubernetes/kubernetes · GitHub

各controllerの起動処理を実行しているようです。 肝心のReplicaSetControllerの起動処理をみてみます。

func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
    // ...
    go replicaset.NewReplicaSetController(
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
    return nil, true, nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/cmd/kube-controller-manager/app/apps.go#L69:6

ReplicaSetControllerの生成と起動処理を別goroutineで実行しているようです。

func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
    // ...
    for i := 0; i < workers; i++ {
        go wait.Until(rsc.worker, time.Second, stopCh)
    }

    <-stopCh
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L177:34

指定された数のworkerを起動して、stopChでblockしています。

func (rsc *ReplicaSetController) worker() {
    for rsc.processNextWorkItem() {
    }
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
    key, quit := rsc.queue.Get()
    if quit {
        return false
    }
    defer rsc.queue.Done(key)

    err := rsc.syncHandler(key.(string))
    if err == nil {
        rsc.queue.Forget(key)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
    rsc.queue.AddRateLimited(key)

    return true
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L432:34

肝心のworkerはqueueからtaskを取得して、ReplicaSetController.syncHandler()処理を呼び出しています。 このqueueまわりもslideの後半で解説されていましたが、概要としてはapi-serverからcontrollerが関心のあるEventに絞って取得していると理解しています。

func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
    //...
    rsc.syncHandler = rsc.syncReplicaSet

    return rsc
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L163

ReplicaSetController.syncHandlerにはsyncReplicaSetが生成処理時にセットされています。

// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
    // ...
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    // ...
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    // ...
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    // ...
    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    // ...

    // Ignore inactive pods.
    filteredPods := controller.FilterActivePods(allPods)

    // NOTE: filteredPods are pointing to objects from cache - if you need to
    // modify them, you need to copy it first.
    filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
    // ...

    var manageReplicasErr error
    if rsNeedsSync && rs.DeletionTimestamp == nil {
        manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    }
    // ...

概要としては、処理対象のReplicaSetとfilterlingしたPodを取得して、ReplicaSetController.manageReplicas()を呼んでいます。 これでようやくslideの最初の処理のたどり着きました。

ReplicaSetController.manageReplicas()

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_19.jpg?13730929 https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=20

// manageReplicas checks and updates replicas for the given ReplicaSet.
// Does NOT modify <filteredPods>.
// It will requeue the replica set in case of an error while creating/deleting pods.
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    // ...
    if diff < 0 {
        diff *= -1
        // ...
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
            if err != nil && errors.IsTimeout(err) {
                // ...
                return nil
            }
            return err
        })
        // ...

        return err

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L459:34

func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
    remaining := count
    successes := 0
    for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
        errCh := make(chan error, batchSize)
        var wg sync.WaitGroup
        wg.Add(batchSize)
        for i := 0; i < batchSize; i++ {
            go func() {
                defer wg.Done()
                if err := fn(); err != nil {
                    errCh <- err
                }
            }()
        }
        wg.Wait()
        curSuccesses := batchSize - len(errCh)
        successes += curSuccesses
        if len(errCh) > 0 {
            return successes, <-errCh
        }
        remaining -= batchSize
    }
    return successes, nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L658:6

PodReplicaSetのreplica数の差分をとって、Podの作成処理を実行していますね。 slowStartBatch()は作成処理を並列で走らせるhelper関数のようです。 段階的に一度に起動するgoroutineの数を増やしていく処理の書き方として非常に参考になります。(IntMin()のような処理はstd libで欲しいと思ってしまう) ReplicaSetController.podControlはinterfaceで、実際のPod作成処理はReadPodControlが実装しています。

func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
    if err := validateControllerRef(controllerRef); err != nil {
        return err
    }
    return r.createPods("", namespace, template, controllerObject, controllerRef)
}

kubernetes/controller_utils.go at 2f76f5e63872a40ac08056289a6c52b4f6250154 · kubernetes/kubernetes · GitHub

func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
    pod, err := GetPodFromTemplate(template, object, controllerRef)
    // ...
    if len(nodeName) != 0 {
        pod.Spec.NodeName = nodeName
    }
    // ...
    newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(pod)
    if err != nil {
        r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
        return err
    }
    // ...
    return nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/controller_utils.go#L567

確かにslideのとおり、r.createPods("", namespace, template, controllerObject, controllerRef) としてnodeNameが空のPodを生成しているのがわかります。

Schedulerがenqueue

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_20.jpg?13730930

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=21

func AddAllEventHandlers(...) {
    // ...
    podInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return !assignedPod(t) && responsibleForPod(t, schedulerName)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
                    }
                    utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
                    return false
                default:
                    utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    sched.addPodToSchedulingQueue,
                UpdateFunc: sched.updatePodInSchedulingQueue,
                DeleteFunc: sched.deletePodFromSchedulingQueue,
            },
        },
    )

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/eventhandlers.go#L418

func assignedPod(pod *v1.Pod) bool {
    return len(pod.Spec.NodeName) != 0
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/eventhandlers.go#L323:6

SchedulerpodInformerにevent handlerを登録する際に、podがassigne(NodeNameが設定されている)されていないことを条件とするfilterを設定していることがわかります。

kubeletはskip

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_21.jpg?13730931

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=22

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
    newSourceApiserverFromLW(lw, updates)
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/config/apiserver.go#L32

kubeletのapi serverへのclient生成処理時に、PodのHost名が(おそらく)自身のnode名と一致するFilterを設定しているようです。

Schedulerがnode nameを設定

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_22.jpg?13730932

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=23

func (sched *Scheduler) scheduleOne() {
    // ...
    pod := sched.NextPod()
    // ...
    scheduleResult, err := sched.schedule(pod, pluginContext)
    // ...
    assumedPod := pod.DeepCopy()
    // ...

    // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    // ...
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/scheduler.go#L516

func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    // ...
    assumed.Spec.NodeName = host
    // ...
    return nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/scheduler/scheduler.go#L447:25

Scheduling処理自体は一大Topicですが、流れとしては、なんらかの方法でNode名を選出して、Podのnode名に指定していることがわかります。

kubeletがコンテナを起動

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_23.jpg?13730933

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=24

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
  // ...
    // Step 6: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        if err := start("init container", container); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
    }
    // ...

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L803

kubeletの処理はまったく追えていないのですが、コンテナを起動しているような処理を実行しています。

kubeletPodのstatusを更新

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_24.jpg?13730934

func (kl *Kubelet) syncPod(o syncPodOptions) error {
    // pull out the required options
    pod := o.pod
    mirrorPod := o.mirrorPod
    podStatus := o.podStatus
    updateType := o.updateType

    // ...

    // Generate final API pod status with pod and status manager status
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/kubelet.go#L1481

PodのTerminating

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_25.jpg?13730935

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=26

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_26.jpg?13730936

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=27

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    if kl.podIsTerminated(pod) {
        if pod.DeletionTimestamp != nil {
            // If the pod is in a terminated state, there is no pod worker to
            // handle the work item. Check if the DeletionTimestamp has been
            // set, and force a status update to trigger a pod deletion request
            // to the apiserver.
            kl.statusManager.TerminatePod(pod)
        }
        return
    }
    // ...

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/kubelet/kubelet.go#L1999

ReplicaSetControllerPodを削除

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_27.jpg?13730937

https://speakerdeck.com/govargo/under-the-kubernetes-controller-36f9b71b-9781-4846-9625-23c31da93014?slide=28

func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    // ...
    }
    if diff < 0 {
        // ...
    } else if diff > 0 {
        // ...
        // Choose which Pods to delete, preferring those in earlier phases of startup.
        podsToDelete := getPodsToDelete(filteredPods, diff)
        // ...
        errCh := make(chan error, diff)
        var wg sync.WaitGroup
        wg.Add(diff)
        for _, pod := range podsToDelete {
            go func(targetPod *v1.Pod) {
                defer wg.Done()
                if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
                    // ...
                    errCh <- err
                }
            }(pod)
        }
        wg.Wait()

        select {
        case err := <-errCh:
            // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
            if err != nil {
                return err
            }
        default:
        }
    }

    return nil
}

https://github.com/kubernetes/kubernetes/blob/2f76f5e63872a40ac08056289a6c52b4f6250154/pkg/controller/replicaset/replica_set.go#L459

ここで再び、ReplicaSetController.manageReplicas()に戻ってきました。今度は、specよりも実際のPodが多いので、削除処理が走るようです。削除処理はシンプルに削除する数だけgoroutineを起動するようです。

Reconcile

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_28.jpg?13730938

https://speakerd.s3.amazonaws.com/presentations/173fdf8b50f445ba895b82fb6650825c/preview_slide_29.jpg?13730939

ここまで、非常に簡単にですがslideにそって、ReplicaSetControllerを中心に該当コードを追いかけてみました。 kubernetesのcodeを初めて読んだのですが、各componentの実装がだいたいどのあたりあるのかを知るためのとっかかりとして非常に参考になりました。slideの後半では、InformerWorkQueueについても解説されているので、是非そちらも追いかけてみようと思います。

*1:会社のブログで記事にしました。 blog.howtelevision.co.jp

nushell/bookに日本語翻訳のPRだしたらマージしてもらえました

f:id:yamaguchi7073xtt:20190907001301p:plain

nushell/bookというRust製shellのbook(document)に日本語翻訳のPRをだしたらmergeしてもらえました。

ドキュメントはこちらから読めます。

そもそもnushellとは A modern, GitHub-era shell written in Rustと謳っているとおり、Rustで書かれたshellです。主な特徴は、コマンド間の入出力(Stream)をテキストベースからデータ構造に拡張している点です。

https://raw.githubusercontent.com/nushell/nushell/master/images/nushell-autocomplete4.gif

その後、メインテナーの一人であられるjonathanさんからnushell/bookのrepositoryのメンバーに招待していただき、merge権限までもらえました。

ドキュメントだけではありますが、はじめてオープンソースのプロジェクトにPRをmergeしてもらえ、チームに招待してもらえたのはすごくうれしかったです。

今後は、nushell本体にRustのCodeでContributeすることを目指していきたいです。