この記事ではRustの非同期runtimeのひとつtokioの公式Tutorialを通じてtokioのAPIに入門していきます。
TutorialではMini-RedisというRedisのclient/serverを実装したlibraryを通してtokioとfuture/asyncの概念を学んでいきます。Redisについての前提知識は必要とされていません。
Rustでasync/awaitが使えるになりましたが、実際にアプリケーションを書くにはruntimeを選択する必要があります。今だとtokioかasync-stdが現実的な選択肢なのでしょうか。非同期のruntimeを選択すると基本的にI/Oをともなう処理はすべて(?)選択したruntimeのAPIを利用することになると思います。そのため、Rustの非同期ecosystemの恩恵にあずかるにはruntime/tokioのAPIになれておく必要があります。
まとめ
本tutorialを通して以下のことを学べました。
std::sync::Mutex
とtokio::sync::Mutex
の使い分け方.await
と書いたときにどんなことがおきるかのメンタルモデルができるMutex
による状態共有からmpsc
とoneshot
channelを利用したパターンへの移行Frame
という概念(byte stream -> frame -> protocol)bytes::{BytesMut,Bytes}
の利用例- futureを
poll
するexecutorの概要 select!
でgoっぽく書けるStream
にはpin
どめが必要
サンプルコードが豊富でgithubではより細かくコメントが書いてあります。
準備
rustは1.47を利用しました、最新であれば特に問題ないと思います。minimumは1.39.0
です。 rustc 1.47.0 (18bf6b4f0 2020-10-07)
この記事を書いている1週間ほど前にtokio v0.3.0がリリースされました。ですがmini-redisはtokioの0.2に依存しているので0.2で進めていきます。 bytes
crateへの依存がpublic APIから削除されたので、read_buf()
-> read()
に変更する以外は特に影響ないです。
mini-redisもtokio v0.3を利用するようになりました。
Mini-Redis server
client側のコードを書く際にserverが起動していると便利なのでmini-redisを動かせるようにします。
$ cargo install mini-redis
$ mini-redis-server
cargo install
でもってくることもできますが、ソースから動かすことにします。
git clone https://github.com/tokio-rs/mini-redis
cd mini-redis
cargo run --bin mini-redis-server
別terminalで
cargo run --bin mini-redis-cli --quiet set xxx xxx
cargo run --bin mini-redis-cli --quiet get xxx
"xxx"
とできればOKです。
My-Redis
cargo new my-redis
cd my-redis
Cargo.toml
tokio = {version = "0.3.1", features = ["full"]}
mini-redis = "0.3"
projectを作成して、tokioとmini-redisを依存先に追加します。
楽をしてfeaturesにfull
を指定していますが、実際には利用する機能にあわせてnet
, fs
, rt-threaded
のように指定します。API documentに利用するために必要なfeatureが記載されています。
v0.3
ではrt-core
とrt-util
がrt
に、tcp
, udp
, dns
がnet
にまとめられる等して整理されています。1
これで準備が整ったので早速コードを書いていきましょう!
Sourceはこちら (branchがmasterからmainになっています。)
Hello Tokio
tutorialではmain.rs
に書いていますが残しておきたいのでexamples/hello-redis.rs
を作成します。
use ;
pub async
別terminalでmini-redisが起動してある前提で
$ cargo run --quiet --example hello-redis
got value from the server; result = Some(b"world")
となれば成功です。
Attribute Macro tokio::main
まずいきなりうっ..となったのがtokio::main
macroです。main関数からいきなり隠蔽されるのは抵抗ないでしょうか。ということでcargo expand
で展開内容をみていきます。
use *;
extern crate std;
use ;
このような展開結果となりました。概要としてはruntimeをBuilder patternで設定してユーザのコードをasync blockでwrapしたうえでblock_on()
に渡している感じです。
documentによるとRuntime
のセットアップをユーザがRuntime
やBuilder
を直接利用することなくできるようにするためのhelperという位置づけのようです。
v0.3.0
では
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
#[tokio::main(flavor = "current_thread")]
のようにmulti/single threadの切り替えやworker thread数を制御できるようです。
このruntimeがなにをやってくれているかは後述します。
ちなみにv0.3.0
では以下のように展開されました。
new_multi_thread
.enable_all
.build
.unwrap
.block_on
runtimeの設定処理の理解を試みたのですが歯がたちませんでした。
worker thread数は指定しない場合seanmonstar先生のnum_cpusが利用され論理コア数が使われているようです。
await
のメンタルモデル
async
fn/blockの中でawait
を書いたらなにが起きるのピンと来ていませんでした。なのでせめて概念的にでも.await
書いたら実はこうなってるというメンタルモデル(あくまで自分の)を得るのが目標でしたが、本tutorialを行い現状では以下のように考えています。
- async fn/blockは
Future::poll
2の実装に変換される。 Future
を実装したanonymous structは最終的にはtaskという形でruntimeに渡されてpoll()
を呼んでもらえる。future_a.await
と書いたコードは以下のように変換される。(詳しくは後述します)
let value = match future_as_mut.poll
- 結果的にユーザはfutureが完了したあとの処理だけを記述すればよく、非同期処理が同期処理っぽく書ける。
Spawning
client側のコードを書いたので次はserver側のコードを書いていきます。こちらはsrc/main.rs
に記述します。
use ;
use ;
async
async
このコードでは、process
が完了するまで次の接続を受け付けていないので同時に1つの接続しか処理されません。process
をconcurrentにおこなうには以下のようにします。
use TcpListener;
async
process
をawaitしているfutureをtokio::spawn
に渡します。tokio::spawn
はfutureの実行単位で自分はgo func() { ... }
のようなgoroutineと考えています。ただし、goroutineと違うのは戻り値を取得できるところです。(goroutineの場合はchan等を使う必要がある)
async
Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory. Applications should feel free to spawn thousands, if not millions of tasks.
ということでたった64bytesのallocationしか必要とせず、ある程度は気にせずにspawn
してもいいみたいです。
Concurrency and Parallelism
いわゆる並行と並列の違いについても言及があります。
Concurrency and parallelism is not the same thing. If you alternate between two tasks, then you are working on both tasks concurrently, but not in parallel. For it to qualify as parallel, you would need two people, one dedicated to each task.
concurrentとparallelの違いについてはGo言語による並行処理 2章並行性をどうモデル化するかで書かれている以下の文がいちばんわかりやすいと思っています。
並行性はコードの性質を指し、並列性は動作しているプログラムの性質を指します。
この本は本当に名著だと思い原文のConcurrency in Goも読みました。原文では以下のように書かれています。
Concurrency is a property of the code; parallelism is a property of the running program
そして
The first is that we do not write parallel code, only concurrent code that we hope will be run in parallel. Once again, parallelism is a property of the runtime of our program, not the code.
というわけで、コードでは並行かそうでないかだけが制御できるという姿勢でいるようになりました。
'static
bound
tokio::spawn
の定義は以下のようになっています。
ⓘ
where
T: Future + Send + 'static,
: Send + 'static,
Output
ここでT: 'static
となっているとそれは"lives forever"とよく誤解されているがそうではないという注意があります。(このCommon Rust Lifetime Misconceptionsは非常に参考になったので別で記事を書こうと思っています。)
T: 'static
はTの所有者はTを保持している限りデータが無効になることはないと保証されているので、プログラムの終了までを含めて無期限にデータを保持できると読めて、"T is bounded by a 'static lifetime"と読むべきで、"T has a 'static lifetime"と読まないと自分は理解しています。
要はT: 'static
だったらTはowned typeか'static lifetime
の参照しかfieldにもたない型ということ。
Send bound
tokio::spawn
に渡されるfutureはSend
をimplementしている必要がある。taskがSend
になるには、.await
をまたぐすべてのデータがSend
である必要がある。逆にいうと.await
またがなければSend
でないデータでも使える。
spawn;
これはOK。
spawn;
これはRc
がSend
でないのでコンパイルエラー。
Shared state
redis serverを実装するにあたって状態を保持する必要があります。状態を共有する方法として例えば以下の2つが考えられます。
Mutex
でガードして保持する。- stateを管理する専用のtaskをspawnしてchannelを通じてやりとりする。
シンプルなデータでは最初の方法が適していて、I/O等の非同期処理が必要になってくると2つ目の方式が適している。
今回の実装では状態はHashMap
でメモリに保持するのでMutex
を利用した方式で実装している。(channelを使う方式はのちのちふれる)
bytes
crate
byte streamを表現するのにVec<u8>
でなくBytes
を利用するためにbytes
crateを依存先に追加します。tokioのversionが0.3にあがったので、0.6
を指定します。
Cargo.toml
= "0.6"
HashMap
の共有
use TcpListener;
use HashMap;
use ;
async
HashMap
にMutex
でinterior mutabilityを付与して、Arc
で複数threadでownできるようにします。
tokio::sync::Mutex
ではなくstd::sync::Mutex
を利用していることに注意してください。
tokio::sync::Mutex
は取得したlockが.await
をまたぐ際に利用するそうです。 std
とtokio
のMutex
の使い分けがずっとわかっていなかったので疑問点がひとつ解消できてうれしいです。
競合が少なく、取得したロックが.await
をまたがない場合はasync内でsynchronous mutexを利用してもよいそうです。
ただし注意点としてロック取得によるblockはそのtaskだけでなくtaskを実行しているthreadにscheduleされている他のtaskもブロックするので注意が必要です。これはmutexに限った話ではなくasync内でblockするAPI呼ぶ場合の一般的注意事項といえそうです。
また、parking_lot::Mutex
の利用も選択肢にあるそうなのですが、よくわかっていません。Rustで時々でてくるparking
についてはいずれ調べていきたいです。
mutexのロックが問題になった場合の選択肢として
- 共有していたstateの管理専用のtaskを用意して、message passingを利用する。
- mutexを分割する
- そもそもmutexを利用しないようにする
たとえば以下のようにしてHashMap
を分割してロックの競合する頻度をさげることができるそうです。またdashmapがsharded hash mapの機能を提供しています。
type ShardedDb = ;
let shard = db.lock.unwrap;
shard.insert;
Channels
クライアントサイドからみていきます。まず書きたいのは以下のような処理だとします。
use client;
async
このコードはコンパイルできません。client
はcopyでないので所有権の問題がありますし、Client::set
は&mut self
を要求するので排他制御が必要になってきます。
そこでmessage passingというパターンを利用します。client
リソースを管理する専用のtaskをspawnして、clientに処理を依頼したいtaskはclient taskに処理を依頼するmessageを送る形にします。
このパターンを使うと、接続するconnectionは1本で済みclientをmanageするtaskはclientに排他的にアクセスできます。またchannelはbufferとしても機能するので処理のproducerとconsumerの速度差を吸収してくれます。
tokioのchannel primitives
tokioは目的ごとに以下のchannel primitiveを用意してくれています。
mpsc
: multi-producer, single consumer用channeloneshot
: 一度だけの値の通知に利用できるbroadcast
: 送ったmessageはそれぞれのreceiverに届くwatch
: single-producer, multi-consumer.receiverは最新の値だけうけとれる。
std::sync::mpsc
やcrossbeam::channel
はthreadをblockしてしまうのでasyncで使うには適さないそうです。以下ではmpsc
とoneshot
を利用していきます。
use Bytes;
use client;
use ;
type Responder<T> = Sender;
async
ポイントはclient taskに処理を依頼するCommand
に結果を通知するためのfiledが用意してあるところでしょうか。
Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
と定義して、依頼したコマンドの結果をうけとれるようになっています。
処理を依頼するchannelと結果をつけとるchannelで別のprimitiveを利用するこのパターンは非常に参考になりました。Goだと両方ともchan
になりますが、oneshot
のほうが意図がでていいなと思います。
I/O
AsyncRead
/AsyncWrite
Futureのpoll
を直接呼ぶようなコードを書かないようにAsyncRead
/AsyncWrite
traitを直接よぶことは基本的になく、それぞれに対応しているAsyncReadExt
/AsyncWriteExt
を利用します。
use ;
use File;
async
use ;
use File;
async
fileの内容をすべて読んだり、書き込んだりするのはこのようになります。同期的な書き方と同じですね。
Echo server
ということでI/Oといえばechoということでecho serverを実装していきます。
server sideは以下のようになります
use io;
use TcpListener;
async
async
async
自分でbufferを確保してloopでreadする方法とtokio::io::copy
を利用する方法があります。 io::copy
はreaderとwriterそれぞれに&mut
を要求するので
io::copy(&mut socket, &mut socket).await
としたいところですが、参照の制約からそれができません。そこで、io::split
を利用します。(ただしTcpStream
は自前で用意している)
split
APIをみるといつかtwitterで流れてきたこの画像がいつも思い出されます。
電車でDのmemeみたいです。
clientは以下のようになります。
use ;
use TcpStream;
async
split
でwriterをmoveで渡しています。
Framing
次にTcpStream
をwrapしてbyte streamからredisの各種コマンドAPIを提供しているConnection
を実装していきます。 ここでいうframeとはclient/server間で送られるデータの単位という感じでしょうか。(A frame is a unit of data transmitted between two peers.)
1つ以上のframeでprotocolにおけるmessageになると考えています。 今回実装しようとしているRedis wire protocolについてはこちら
実装に必要な範囲でまとめると以下のようになります。
RESP(REdis Serialization Protocol)
https://redis.io/topics/protocol
first byteでdataのtypeを判定できる
+
Simple Strings-
Errors:
Integers$
Bulk Strings*
Arrays
protocolは常に\r\n
(CRLF)でterminated
RESP Simple Strings
+
に続いてCRとLFを含まない文字列で構成される。
成功を表すOKは以下のように5byte。
+OK\r\n
RESP Errors
エラー用のdata type. 実体としてはSimple Stringsだがprefixが-
で区別される。
clientに例外として扱われ、内容はエラーメッセージ。
-Error message\r\n
-ERR unknown command 'foobar'
-WRONGTYPE Operation against a key holding the wrong kind of value
ERR
のあとにWRONGTYPE
のような具体的なエラー種別を返すのはRedisの慣習でRESPのError Formatではない。
RESP Integers
CRLF terminatedなstringでintegerにparseできる。prefixは:
。
signed 64bit integerのrangeであることが保証されている。
:0\r\n
:1000\r\n
RESP Bulk Strings
512MBまでの任意のbyte列を保持できるデータ。redisのdocumentではbinary safeと言われている。
binary safe stringの意味については予め長さが分かっていて特定の文字による終端を前提にしておらず、任意のbyte列を保持できるということだと思う。
$6\r\nfoobar\r\n
: "foobar".$0\r\n\r\
: empty string.$-1\r\n
: non-existenceを表現。
RESP Arrays
複数のdata typeを表すdata type.要素数はprefix*
のあとに明示される。
*0\r\n
: empty array.*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
: "foo", "bar"を表す。*-1\r\n
: Null Array.
Sending commands to a Redis Server
clientからserverにLLEN mylist
を送るリクエストは以下のようになる。
*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n
- -
frameの実装
上記のredis data typeをRustで表現すると
use Bytes;
のようになります。シンプルですね。RedisのCommandは複数Frame(Frame::Array
)で表現されているので、ユーザとしてはtcp socket(reader)を渡してコマンドを返してくれるような処理が欲しくなります。
server sideのframeを読む処理は以下のようになります。
use Bytes;
use Frame;
use HashMap;
use ;
use ;
use Connection;
async
Connection::read_frame()
がFrame::Array
を返し、Frame::Array
からRedisのCommand
に変換しています。
ということで、byte streamからFrame::Array
に変換する処理を見ていきます。
Buffered reads
use ;
use ;
use Incomplete;
use ;
use ;
use TcpStream;
Connection
はTcpStream
とframeをparseするためのbufferを保持しています。bufferの型としてbytes::BytesMut
を利用しています。
impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W>
が定義されているので、Connection.stream
はAsyncReader
として利用できます。 read_frame
が呼ばれると読み込んだbufferからFrameが生成できればFrameを返しbufferを更新します。Frameを生成するまでのbufferが足りなければtcp streamからreadします。
なにかをparseする際は先読みすることが必要となることが多いと思いますが、ここではstd::io::Cursor
でwrapしてからFrame::check
にbufferを渡すことでbufferのpositionを変更することなくparse処理を委譲できています。Cursor
のこのような使い方は非常に参考になります。
buffered writes
pub async
async
write処理はRedisのprotocolにしたがってFrame
をencodeして書き込んでいきます。BytesMut
はsystem callの回数を抑えるために書き込みをbufferするので、最後にflush
を呼びます。このあたりは同期処理と同じですね。
APIの設計上Connection
の呼び出し側にいつflush
するかを制御させる設計もありえると言及されていました。
Async in depth
ここまででasyncとtokioの一通りの機能に触れたのでfutureについてもう少し見ていきます。
まず指定された期間が経過したらstdoutに挨拶を表示するfutureを実装してみます。
use Future;
use Pin;
use ;
use ;
async
このコードはこんな感じに展開されるそうです。
use Future;
use Pin;
use ;
use ;
このコードをみて.await
の挙動だったり、.await
またぐときの制約だったりがいろいろ腹落ちしました。futureはstate machinesだ、みたいな記述はこのことを言わんとしていたんですね。
async block内で.await
を使うたびにenum
で定義されたStateが増えていくことになるんですね。"zero cost abstractions"はだてじゃない。
Mini Tokio Executor
async block/fnがpoll
に変換されるということは誰かがこのpoll
を呼び出す必要があります。それがtokio(runtime)が提供しているexecutorです。
use channel;
use ;
use RefCell;
use Future;
use Pin;
use ;
use ;
use thread;
use ;
thread_local!
executorとしてMiniTokio
を定義しています。MiniTokio::spawn
で渡されたfutureをTask
でwrapしてchannelのSenderをcloneして保持させています。
Delay
のfuture実装では別threadを起動して指定期間経過後にwakerをwakeします。
futures::task::ArcWake
traitを実装してあると、impl ArcWake
-> Waker
-> Context
と作成できVTableを自分で作らなくてもpoll
できるようになるようです。
この実装ではwake_ by_ref
では単純に自分を再度channelにsendしてexecutorのpoll
対象になるようにしています。
ものすごく簡易的な実装だと思いますがExecutorの雰囲気はつかめたような気がします。 tokioのRuntime::block_on
の実装をおってみたところまったくわかりませんでしたが(特にparkの概念)、以下のように確かにfutureをpoll
しているloopがありました。
https://github.com/tokio-rs/tokio/blob/c30ce1f65c5befb2a4b48eb4c16b7da3c0eafbd1/tokio/src/park/thread.rs#L263
loop
Select
tokio::spawn
がgoroutineの生成に対応するならgoっぽく書けるんじゃと思っていましたがそのためにはひとつ重要な予約語がたりません。そうselect
です。
これがないと最初のシグナルハンドリングがそもそも書けないです。(都度チェックする以外)
ということで、tokio::select!
を見ていきます。マクロなのはしょうがないです。(cargo expandするとformatされていないコードが出力されてしまったので載せるのはあきらめました)
use oneshot;
async
ふたつのchannelの結果を待機するコードはこのようになります。
<pattern> = <async expression> => <handler>,
select!
のsyntaxはこのように表されます。
すべての<async expression>
はひとつにまとめられてconcurrentに実行され、あるexpressionの実行が完了して、結果が<pattern>
にマッチすると<handler>
が実行されます。
結果的に実行される<handler>
は必ずひとつのbranchです。
また、<async expression>
は同じtaskとして実行されるので同時に実行されることはない(はず)です。(task内concurrency)
このあたりの仕様はRustのborrow checkerに影響してきます。(handlerではmutable borrowをとれる)
これでsignalのような終了処理を伝播させるような処理もかけそうです。
use TcpStream;
use oneshot;
async
ただしtokioのsignal APIをみてみるとsignalの種類ごとにfutureを生成する必要がありそうなので複数種類のsignal処理を書く場合はそれぞれ生成しておく必要がありそうです。
use ;
async
Streams
駆け足でtutorialの各項目をさらってきましたが最後はStreamです。
Future<Output = T>
がasyncなT
だとしたらStream<Item = T>
はasyncなIterator<Item = T>
という関係のようです。(https://docs.rs/futures-core/0.3.6/futures_core/stream/trait.Stream.html)
I/Oと同様にfutures_core::stream::Stream
はpoll_next
しか定義しておらず、iteratorのような各種APIを利用するにはtokio::stream::StreamExt
を利用します。 redisのpub/subっぽいことをするコードは以下のようになります。(mini-redis-server
を起動しておきます)
use client;
use StreamExt;
async
async
async
subscriber.into_stream
でsubscriberをconsumeしたあと、StreamExt
を利用してadaptor処理を追加しています。このあたりの使用感はiteratorと同じですね。
注意が必要なのはnext
を呼ぶ前にtokio::pin!
という見慣れないマクロをよんでいることです。
next
を呼ぶためにはstreamがpinned
されている必要があり、into_stream
はpin
されていないStream
を返しています。このtokio::pin!
を忘れるとものすごいコンパイルエラーメッセージとともに、pin
する必要があるとコンパイラーから注意してもらえます。(.... cannot be unpined
)
pin
についてはasync bookやOPTiMさんのPinチョットワカルを参考にさせていただきました。
自分の理解ではPin<T>
としておくとメモリ上で動かしてはいけない型(Futureを実装したstruct)が&mut self
をとれなくなり結果的にstd::mem::replace
等が使えなくなり安全になるという感じです。
なぜFutureの実装をメモリから動かせないかというと.await
をまたいた変数はstructのfieldに変換されるからとういことでしょうか。
終わりに
tokio tutorialを写しながら動かしていってだいぶasync/tokioのAPIに慣れてきました。まだまだはまりどころありそうですが少なくとも今まで書いていた同期処理のコードをtokioを使って書き直すとかはできるようなきがしてきました。また、Mini-Redisの実装では触れられなかったところでも参考になりそうな箇所が多くあり(protocolのparseのところ等)参考にしていきたいと思っています。
tokioのblog記事 Announcing Tokio 0.3 and the path to 1.0では2020年12月の終わり頃に1.0のリリースを計画していることが書かれています。
tokio1.0では
- A minimum of 5 years of maintenance.
- A minimum of 3 years before a hypothetical 2.0 release.
というstability guaranteesへのcommitが宣言されています。すごいですね、オープンソースで5年メンテします!と宣言しているものはかなり少ないのではないでしょうか。
これなら安心してtokio使っていけますね。
参考document
- async book. TODOが目立つがExecutorの簡易的な実装例がのっている。
- keen先生のblog.
RawWakrVTable
からContextを作っている。 - 非同期Rustの動向調査
- OPTiMさんのtech blog
- Writing As OS in Rustのasync/awaitの章
https://github.com/tokio-rs/tokio/releases/tag/tokio-0.3.0
https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll