RustでMySQLにpingをうつ

local環境でdocker-compose等を利用してDBを立ち上げた際に、DBの"ready"を待ちたいことがありました。 最初は、tcp接続でよしとしていたのですが、やはりprotocol的な"ready"が必要だったので、pingをうつ必要がありました。 ということで、RustでMySQLにpingを打ち続けるCLIを作ってみました。source code

flagは以下のような感じです。

$ mysqlpinger --help
mysqlpinger 0.2.1
Ping to mysql server

USAGE:
    mysqlpinger [FLAGS] [OPTIONS] [DBNAME]

ARGS:
    <DBNAME>    Database name [env: MYSQL_DB_NAME=]  [default: sys]

FLAGS:
    -s, --silent     Running with no logging
    -v, --verbose    Verbose
        --forever    Retry without limit
        --help       Prints help information
    -V, --version    Prints version information

OPTIONS:
    -h, --host <HOST>            MySQL server hostname [env: MYSQL_HOST=]  [default: 127.0.0.1]
    -p, --port <PORT>            MySQL server port [env: MYSQL_PORT=]  [default: 3306]
    -u, --user <USER>            User for authentication [env: MYSQL_USER=]  [default: root]
    -P, --pass <PASS>            Password for authentication [env: MYSQL_PASSWORD=]
    -m, --max-retry <COUNT>      Max retry count [default: 9]
    -i, --interval <DURATION>    Retry ping interval [default: 1s]

Example:
    # Basic
    mysqlpinger --pass=root --port=30303 <db_name>

    # Docker
    docker run --rm -t --network=<network> ymgyt/mysqlpinger:latest \
       --user=user --pass=secret --host=<container_name> [--forever|--max-retry=20]

接続のための情報とどれくらいretryするかを指定して、pingが通るまでblockします。

$ docker run --rm -t --network=network  ymgyt/mysqlpinger:latest --pass=secret --host=db --forever
INFO ping -> addr:db:3306 user:root db:sys
INFO 1/♾  Connection refused (os error 111)
INFO 2/♾  Connection refused (os error 111)
// ...
INFO 30/♾  Connection refused (os error 111)
INFO 31/♾  Connection refused (os error 111)
INFO OK (elapsed 31.152sec)

Connection処理

[dependencies]
lazy_static = "1.4.0"
mysql = "17.0.0"
parse_duration = "2.0"
colored = "1.9"
log = "0.4"
env_logger = "0.6"
console = "0.9.2"
ctrlc = "3.1.3"

[dependencies.clap]
git = "https://github.com/clap-rs/clap"
branch = "master"
use clap::ArgMatches;
use log::{debug, info};
use mysql::{Conn, Opts, OptsBuilder};
use parse_duration;
use std::{
    borrow::Cow,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    thread,
    time::Duration,
};

type BoxError = Box<dyn std::error::Error>;

pub struct MySQLPinger {
    opts: Arc<Opts>,
    interval: Duration,
    forever: bool,
    max_retry: u64,
    canceled: AtomicBool,
}

impl MySQLPinger {
    pub fn from_arg(m: &ArgMatches) -> Result<Self, BoxError> {
        let interval = parse_duration::parse(m.value_of("interval").unwrap())?;
        // we need OptsBuilder type first, then calling building methods
        let mut builder = OptsBuilder::default();
        builder
            .ip_or_hostname(m.value_of("host"))
            .tcp_port(
                m.value_of("port")
                    .unwrap()
                    .parse::<u16>()
                    .map_err(|e| format!("invalid port {}", e))?,
            )
            .user(m.value_of("user"))
            .pass(m.value_of("pass"))
            .prefer_socket(false)
            .db_name(m.value_of("dbname"))
            .tcp_connect_timeout(Some(interval));

        Ok(Self {
            opts: Arc::new(builder.into()),
            interval,
            forever: m.is_present("forever"),
            max_retry: m.value_of("max_retry").unwrap().parse()?,
            canceled: AtomicBool::new(false),
        })
    }

    pub fn stop(&self) {
        debug!("stop called");
        self.canceled.store(true, Ordering::Relaxed)
    }

    pub fn ping(&self) -> Result<(), BoxError> {
        info!(
            "ping -> addr:{host}:{port} user:{user} db:{db}",
            host = self.opts.get_ip_or_hostname().unwrap_or(""),
            port = self.opts.get_tcp_port(),
            user = self.opts.get_user().unwrap_or(""),
            db = self.opts.get_db_name().unwrap_or(""),
        );
        debug!(
            "interval:{interval:.1}sec attempt:{attempt}",
            interval = self.interval.as_secs_f64(),
            attempt = self.max_attempt_symbol(),
        );

        let mut attempt = 1;
        let max_attempt = self.max_retry + 1;
        loop {
            if !self.forever && attempt > max_attempt {
                return Err("Max retry count exceeded".into());
            }
            if self.canceled.load(Ordering::Relaxed) {
                return Err("Canceled".into());
            }

            use mysql::DriverError;
            use mysql::Error::*;
            let opts = Arc::clone(&self.opts);
            match Conn::new(Opts::clone(&opts)) {
                Ok(mut conn) => {
                    if conn.ping() {
                        return Ok(());
                    }
                }
                Err(DriverError(DriverError::CouldNotConnect(err))) => {
                    if let Some(err) = err {
                        let (_, description, _) = err;
                        info!("{}/{} {}", attempt, self.max_attempt_symbol(), description);
                    }
                }
                Err(DriverError(DriverError::ConnectTimeout)) => {
                    info!(
                        "{}/{} {}",
                        attempt,
                        self.max_attempt_symbol(),
                        "Connection timeout"
                    );
                }
                Err(err) => return Err(Box::new(err)),
            }

            thread::sleep(self.interval);
            attempt = attempt.wrapping_add(1);
        }
    }

    fn max_attempt_symbol(&self) -> Cow<'static, str> {
        if self.forever {
            "♾ ".into()
        } else {
            (self.max_retry + 1).to_string().into()
        }
    }
}

Connectionをはるために、mysql crateを利用しました。 issueでもあがっていたのですが

OptsBuilder::new().tcp_port().user()

のように、OptsBuilderを生成してそのまま、methodを呼ぶと、Optsに変換できずにハマってしまい、以下のようにしました。

        let mut builder = OptsBuilder::default();
        builder
            .ip_or_hostname(m.value_of("host"))    
            // ...
        Ok(Self {
            opts: Arc::new(builder.into()),
           // ...
        }

Dockerfile

task runner系のツールに書いて、開発者のlocal環境に依存しないようにするためにdocker imageにしました。

FROM rust:1.41.0 as builder

WORKDIR /usr/src/project

COPY . .

RUN cargo install --path .

ENTRYPOINT ["mysqlpinger"]

この書き方だと、image sizeがかなり大きくなってしまう(1.65GB)ので、multi stageで、buildとruntimeをわけられるにしたいです。

Go

最初はGoで書いていました。context.Contextのおかげで、timeout系の処理が非常に書きやすいと思いました。

package main

import (
    "context"
    "database/sql"
    "flag"
    "fmt"
    "os"
    "strings"
    "time"

    "github.com/go-sql-driver/mysql"
)

func main() {
    host := flag.String("host", "localhost", "host")
    port := flag.String("port", "3306", "port")
    user := flag.String("user", "root", "user")
    pass := flag.String("pass", "root", "pass")
    name := flag.String("name", "knight_db", "database name")
    rawTimeout := flag.String("timeout", "60s", "connection wait timeout")
    checkSlave := flag.Bool("check-slave", false, "check slave status")

    flag.Parse()
    start := time.Now()

    cfg := mysql.Config{
        User:                 *user,
        Passwd:               *pass,
        Net:                  "tcp",
        Addr:                 *host + ":" + strings.TrimLeft(*port, ":"),
        DBName:               *name,
        AllowNativePasswords: true,
    }

    db, err := sql.Open("mysql", cfg.FormatDSN())
    exitIfErr(err, 1)

    timeout, err := time.ParseDuration(*rawTimeout)
    exitIfErr(err, 1)
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    // connectionがはれない場合、以下のエラーが出続けてしまうので静かにしてもらう
    // [mysql] 2020/01/31 12:55:47 packets.go:36: unexpected EOF
    mysql.SetLogger(&NopLogger{})
    fmt.Printf("connecting to mysql(dsn: %s)\n", cfg.FormatDSN())
    err = waitReady(ctx, db)
    if err == nil {
        fmt.Printf("successfully connected to mysql(elapsed: %s)\n", time.Since(start))
    } else {
        fmt.Fprintf(os.Stderr, "\n%s\n", err.Error())
        os.Exit(2)
    }

    if *checkSlave {
        err := checkSlaveStatus(db)
        exitIfErr(err, 3)
    }
}

func waitReady(ctx context.Context, db *sql.DB) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        err := db.Ping()
        if err == nil {
            return nil
        }
        fmt.Printf(".")
        time.Sleep(time.Second)
    }
}

func checkSlaveStatus(db *sql.DB) error {
    fmt.Println("checking slave status...")
    rows, err := db.Query("SHOW SLAVE STATUS")
    if err != nil {
        return err
    }
    columns, err := rows.Columns()
    if err != nil {
        return err
    }

    values := make([]interface{}, len(columns))
    for rows.Next() {
        for i := 0; i < len(columns); i++ {
            var s sql.NullString
            values[i] = &s
        }
        err := rows.Scan(values...)
        if err != nil {
            return err
        }
    }

    ioRunning, sqlRunning := false, false
    for i, column := range columns {
        if column == "Slave_IO_Running" {
            ioRunning = values[i].(*sql.NullString).String == "Yes"
        }
        if column == "Slave_SQL_Running" {
            sqlRunning = values[i].(*sql.NullString).String == "Yes"
        }
    }

    fmt.Printf("Slave_IO_Running: %v SLave_SQL_Running: %v\n", ioRunning, sqlRunning)

    if !ioRunning || !sqlRunning {
        return fmt.Errorf("slave thread does not work :(")
    }
    return nil
}

func exitIfErr(err error, code int) {
    if err != nil {
        fmt.Fprintln(os.Stderr, err.Error())
        os.Exit(code)
    }
}

type NopLogger struct{}

func (l *NopLogger) Print(_ ...interface{}) {}