この記事ではRustの非同期runtimeのひとつtokioの公式Tutorialを通じてtokioのAPIに入門していきます。
TutorialではMini-RedisというRedisのclient/serverを実装したlibraryを通してtokioとfuture/asyncの概念を学んでいきます。Redisについての前提知識は必要とされていません。
Rustでasync/awaitが使えるになりましたが、実際にアプリケーションを書くにはruntimeを選択する必要があります。今だとtokioかasync-stdが現実的な選択肢なのでしょうか。非同期のruntimeを選択すると基本的にI/Oをともなう処理はすべて(?)選択したruntimeのAPIを利用することになると思います。そのため、Rustの非同期ecosystemの恩恵にあずかるにはruntime/tokioのAPIになれておく必要があります。
まとめ
本tutorialを通して以下のことを学べました。
std::sync::Mutexとtokio::sync::Mutexの使い分け方.awaitと書いたときにどんなことがおきるかのメンタルモデルができるMutexによる状態共有からmpscとoneshotchannelを利用したパターンへの移行Frameという概念(byte stream -> frame -> protocol)bytes::{BytesMut,Bytes}の利用例- futureを
pollするexecutorの概要 select!でgoっぽく書けるStreamにはpinどめが必要
サンプルコードが豊富でgithubではより細かくコメントが書いてあります。
準備
rustは1.47を利用しました、最新であれば特に問題ないと思います。minimumは1.39.0です。 rustc 1.47.0 (18bf6b4f0 2020-10-07)
この記事を書いている1週間ほど前にtokio v0.3.0がリリースされました。ですがmini-redisはtokioの0.2に依存しているので0.2で進めていきます。 bytes crateへの依存がpublic APIから削除されたので、read_buf() -> read()に変更する以外は特に影響ないです。
mini-redisもtokio v0.3を利用するようになりました。
Mini-Redis server
client側のコードを書く際にserverが起動していると便利なのでmini-redisを動かせるようにします。
$ cargo install mini-redis
$ mini-redis-server
cargo installでもってくることもできますが、ソースから動かすことにします。
git clone https://github.com/tokio-rs/mini-redis
cd mini-redis
cargo run --bin mini-redis-server
別terminalで
cargo run --bin mini-redis-cli --quiet set xxx xxx
cargo run --bin mini-redis-cli --quiet get xxx
"xxx"
とできればOKです。
My-Redis
cargo new my-redis
cd my-redis
Cargo.toml
tokio = {version = "0.3.1", features = ["full"]}
mini-redis = "0.3"
projectを作成して、tokioとmini-redisを依存先に追加します。
楽をしてfeaturesにfullを指定していますが、実際には利用する機能にあわせてnet, fs, rt-threadedのように指定します。API documentに利用するために必要なfeatureが記載されています。
v0.3ではrt-coreとrt-utilがrtに、tcp, udp, dnsがnetにまとめられる等して整理されています。1
これで準備が整ったので早速コードを書いていきましょう!
Sourceはこちら (branchがmasterからmainになっています。)
Hello Tokio
tutorialではmain.rsに書いていますが残しておきたいのでexamples/hello-redis.rsを作成します。
use ;
pub async
別terminalでmini-redisが起動してある前提で
$ cargo run --quiet --example hello-redis
got value from the server; result = Some(b"world")
となれば成功です。
Attribute Macro tokio::main
まずいきなりうっ..となったのがtokio::mainmacroです。main関数からいきなり隠蔽されるのは抵抗ないでしょうか。ということでcargo expandで展開内容をみていきます。
use *;
extern crate std;
use ;
このような展開結果となりました。概要としてはruntimeをBuilder patternで設定してユーザのコードをasync blockでwrapしたうえでblock_on()に渡している感じです。
documentによるとRuntimeのセットアップをユーザがRuntimeやBuilderを直接利用することなくできるようにするためのhelperという位置づけのようです。
v0.3.0では
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
#[tokio::main(flavor = "current_thread")]
のようにmulti/single threadの切り替えやworker thread数を制御できるようです。
このruntimeがなにをやってくれているかは後述します。
ちなみにv0.3.0では以下のように展開されました。
new_multi_thread
.enable_all
.build
.unwrap
.block_onruntimeの設定処理の理解を試みたのですが歯がたちませんでした。
worker thread数は指定しない場合seanmonstar先生のnum_cpusが利用され論理コア数が使われているようです。
awaitのメンタルモデル
async fn/blockの中でawaitを書いたらなにが起きるのピンと来ていませんでした。なのでせめて概念的にでも.await書いたら実はこうなってるというメンタルモデル(あくまで自分の)を得るのが目標でしたが、本tutorialを行い現状では以下のように考えています。
- async fn/blockは
Future::poll2の実装に変換される。 Futureを実装したanonymous structは最終的にはtaskという形でruntimeに渡されてpoll()を呼んでもらえる。future_a.awaitと書いたコードは以下のように変換される。(詳しくは後述します)
let value = match future_as_mut.poll
- 結果的にユーザはfutureが完了したあとの処理だけを記述すればよく、非同期処理が同期処理っぽく書ける。
Spawning
client側のコードを書いたので次はserver側のコードを書いていきます。こちらはsrc/main.rsに記述します。
use ;
use ;
async
async
このコードでは、processが完了するまで次の接続を受け付けていないので同時に1つの接続しか処理されません。processをconcurrentにおこなうには以下のようにします。
use TcpListener;
async
processをawaitしているfutureをtokio::spawnに渡します。tokio::spawnはfutureの実行単位で自分はgo func() { ... }のようなgoroutineと考えています。ただし、goroutineと違うのは戻り値を取得できるところです。(goroutineの場合はchan等を使う必要がある)
async
Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory. Applications should feel free to spawn thousands, if not millions of tasks.
ということでたった64bytesのallocationしか必要とせず、ある程度は気にせずにspawnしてもいいみたいです。
Concurrency and Parallelism
いわゆる並行と並列の違いについても言及があります。
Concurrency and parallelism is not the same thing. If you alternate between two tasks, then you are working on both tasks concurrently, but not in parallel. For it to qualify as parallel, you would need two people, one dedicated to each task.
concurrentとparallelの違いについてはGo言語による並行処理 2章並行性をどうモデル化するかで書かれている以下の文がいちばんわかりやすいと思っています。
並行性はコードの性質を指し、並列性は動作しているプログラムの性質を指します。
この本は本当に名著だと思い原文のConcurrency in Goも読みました。原文では以下のように書かれています。
Concurrency is a property of the code; parallelism is a property of the running program
そして
The first is that we do not write parallel code, only concurrent code that we hope will be run in parallel. Once again, parallelism is a property of the runtime of our program, not the code.
というわけで、コードでは並行かそうでないかだけが制御できるという姿勢でいるようになりました。
'static bound
tokio::spawnの定義は以下のようになっています。
ⓘ
where
T: Future + Send + 'static,
Output: Send + 'static,
ここでT: 'staticとなっているとそれは"lives forever"とよく誤解されているがそうではないという注意があります。(このCommon Rust Lifetime Misconceptionsは非常に参考になったので別で記事を書こうと思っています。)
T: 'staticはTの所有者はTを保持している限りデータが無効になることはないと保証されているので、プログラムの終了までを含めて無期限にデータを保持できると読めて、"T is bounded by a 'static lifetime"と読むべきで、"T has a 'static lifetime"と読まないと自分は理解しています。
要はT: 'staticだったらTはowned typeか'static lifetimeの参照しかfieldにもたない型ということ。
Send bound
tokio::spawnに渡されるfutureはSendをimplementしている必要がある。taskがSendになるには、.awaitをまたぐすべてのデータがSend である必要がある。逆にいうと.awaitまたがなければSendでないデータでも使える。
spawn;
これはOK。
spawn;
これはRcがSendでないのでコンパイルエラー。
Shared state
redis serverを実装するにあたって状態を保持する必要があります。状態を共有する方法として例えば以下の2つが考えられます。
Mutexでガードして保持する。- stateを管理する専用のtaskをspawnしてchannelを通じてやりとりする。
シンプルなデータでは最初の方法が適していて、I/O等の非同期処理が必要になってくると2つ目の方式が適している。
今回の実装では状態はHashMapでメモリに保持するのでMutexを利用した方式で実装している。(channelを使う方式はのちのちふれる)
bytes crate
byte streamを表現するのにVec<u8>でなくBytesを利用するためにbytes crateを依存先に追加します。tokioのversionが0.3にあがったので、0.6を指定します。
Cargo.toml
= "0.6"
HashMapの共有
use TcpListener;
use HashMap;
use ;
async
HashMapにMutexでinterior mutabilityを付与して、Arcで複数threadでownできるようにします。
tokio::sync::Mutexではなくstd::sync::Mutexを利用していることに注意してください。
tokio::sync::Mutexは取得したlockが.awaitをまたぐ際に利用するそうです。 stdとtokioのMutexの使い分けがずっとわかっていなかったので疑問点がひとつ解消できてうれしいです。
競合が少なく、取得したロックが.awaitをまたがない場合はasync内でsynchronous mutexを利用してもよいそうです。
ただし注意点としてロック取得によるblockはそのtaskだけでなくtaskを実行しているthreadにscheduleされている他のtaskもブロックするので注意が必要です。これはmutexに限った話ではなくasync内でblockするAPI呼ぶ場合の一般的注意事項といえそうです。
また、parking_lot::Mutexの利用も選択肢にあるそうなのですが、よくわかっていません。Rustで時々でてくるparkingについてはいずれ調べていきたいです。
mutexのロックが問題になった場合の選択肢として
- 共有していたstateの管理専用のtaskを用意して、message passingを利用する。
- mutexを分割する
- そもそもmutexを利用しないようにする
たとえば以下のようにしてHashMapを分割してロックの競合する頻度をさげることができるそうです。またdashmapがsharded hash mapの機能を提供しています。
type ShardedDb = ;
let shard = db.lock.unwrap;
shard.insert;
Channels
クライアントサイドからみていきます。まず書きたいのは以下のような処理だとします。
use client;
async
このコードはコンパイルできません。clientはcopyでないので所有権の問題がありますし、Client::setは&mut selfを要求するので排他制御が必要になってきます。
そこでmessage passingというパターンを利用します。client リソースを管理する専用のtaskをspawnして、clientに処理を依頼したいtaskはclient taskに処理を依頼するmessageを送る形にします。
このパターンを使うと、接続するconnectionは1本で済みclientをmanageするtaskはclientに排他的にアクセスできます。またchannelはbufferとしても機能するので処理のproducerとconsumerの速度差を吸収してくれます。
tokioのchannel primitives
tokioは目的ごとに以下のchannel primitiveを用意してくれています。
mpsc: multi-producer, single consumer用channeloneshot: 一度だけの値の通知に利用できるbroadcast: 送ったmessageはそれぞれのreceiverに届くwatch: single-producer, multi-consumer.receiverは最新の値だけうけとれる。
std::sync::mpscやcrossbeam::channelはthreadをblockしてしまうのでasyncで使うには適さないそうです。以下ではmpscとoneshotを利用していきます。
use Bytes;
use client;
use ;
type Responder<T> = Sender;
async
ポイントはclient taskに処理を依頼するCommandに結果を通知するためのfiledが用意してあるところでしょうか。
Responder<T> = oneshot::Sender<mini_redis::Result<T>>;と定義して、依頼したコマンドの結果をうけとれるようになっています。
処理を依頼するchannelと結果をつけとるchannelで別のprimitiveを利用するこのパターンは非常に参考になりました。Goだと両方ともchanになりますが、oneshotのほうが意図がでていいなと思います。
I/O
AsyncRead/AsyncWrite
Futureのpollを直接呼ぶようなコードを書かないようにAsyncRead/AsyncWrite traitを直接よぶことは基本的になく、それぞれに対応しているAsyncReadExt/AsyncWriteExtを利用します。
use ;
use File;
async
use ;
use File;
async
fileの内容をすべて読んだり、書き込んだりするのはこのようになります。同期的な書き方と同じですね。
Echo server
ということでI/Oといえばechoということでecho serverを実装していきます。
server sideは以下のようになります
use io;
use TcpListener;
async
async
async
自分でbufferを確保してloopでreadする方法とtokio::io::copyを利用する方法があります。 io::copyはreaderとwriterそれぞれに&mutを要求するので
io::copy(&mut socket, &mut socket).await
としたいところですが、参照の制約からそれができません。そこで、io::splitを利用します。(ただしTcpStreamは自前で用意している)
split APIをみるといつかtwitterで流れてきたこの画像がいつも思い出されます。
電車でDのmemeみたいです。
clientは以下のようになります。
use ;
use TcpStream;
async
splitでwriterをmoveで渡しています。
Framing
次にTcpStreamをwrapしてbyte streamからredisの各種コマンドAPIを提供しているConnectionを実装していきます。 ここでいうframeとはclient/server間で送られるデータの単位という感じでしょうか。(A frame is a unit of data transmitted between two peers.)
1つ以上のframeでprotocolにおけるmessageになると考えています。 今回実装しようとしているRedis wire protocolについてはこちら
実装に必要な範囲でまとめると以下のようになります。
RESP(REdis Serialization Protocol)
https://redis.io/topics/protocol
first byteでdataのtypeを判定できる
+Simple Strings-Errors:Integers$Bulk Strings*Arrays
protocolは常に\r\n(CRLF)でterminated
RESP Simple Strings
+に続いてCRとLFを含まない文字列で構成される。
成功を表すOKは以下のように5byte。
+OK\r\n
RESP Errors
エラー用のdata type. 実体としてはSimple Stringsだがprefixが-で区別される。
clientに例外として扱われ、内容はエラーメッセージ。
-Error message\r\n
-ERR unknown command 'foobar'
-WRONGTYPE Operation against a key holding the wrong kind of value
ERRのあとにWRONGTYPEのような具体的なエラー種別を返すのはRedisの慣習でRESPのError Formatではない。
RESP Integers
CRLF terminatedなstringでintegerにparseできる。prefixは:。
signed 64bit integerのrangeであることが保証されている。
:0\r\n:1000\r\n
RESP Bulk Strings
512MBまでの任意のbyte列を保持できるデータ。redisのdocumentではbinary safeと言われている。
binary safe stringの意味については予め長さが分かっていて特定の文字による終端を前提にしておらず、任意のbyte列を保持できるということだと思う。
$6\r\nfoobar\r\n: "foobar".$0\r\n\r\: empty string.$-1\r\n: non-existenceを表現。
RESP Arrays
複数のdata typeを表すdata type.要素数はprefix*のあとに明示される。
*0\r\n: empty array.*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n: "foo", "bar"を表す。*-1\r\n: Null Array.
Sending commands to a Redis Server
clientからserverにLLEN mylistを送るリクエストは以下のようになる。
*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n
- -frameの実装
上記のredis data typeをRustで表現すると
use Bytes;
のようになります。シンプルですね。RedisのCommandは複数Frame(Frame::Array)で表現されているので、ユーザとしてはtcp socket(reader)を渡してコマンドを返してくれるような処理が欲しくなります。
server sideのframeを読む処理は以下のようになります。
use Bytes;
use Frame;
use HashMap;
use ;
use ;
use Connection;
async
Connection::read_frame()がFrame::Arrayを返し、Frame::ArrayからRedisのCommandに変換しています。
ということで、byte streamからFrame::Arrayに変換する処理を見ていきます。
Buffered reads
use ;
use ;
use Incomplete;
use ;
use ;
use TcpStream;
ConnectionはTcpStreamとframeをparseするためのbufferを保持しています。bufferの型としてbytes::BytesMutを利用しています。
impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W>が定義されているので、Connection.streamはAsyncReaderとして利用できます。 read_frameが呼ばれると読み込んだbufferからFrameが生成できればFrameを返しbufferを更新します。Frameを生成するまでのbufferが足りなければtcp streamからreadします。
なにかをparseする際は先読みすることが必要となることが多いと思いますが、ここではstd::io::CursorでwrapしてからFrame::checkにbufferを渡すことでbufferのpositionを変更することなくparse処理を委譲できています。Cursorのこのような使い方は非常に参考になります。
buffered writes
pub async
async
write処理はRedisのprotocolにしたがってFrameをencodeして書き込んでいきます。BytesMutはsystem callの回数を抑えるために書き込みをbufferするので、最後にflushを呼びます。このあたりは同期処理と同じですね。
APIの設計上Connectionの呼び出し側にいつflushするかを制御させる設計もありえると言及されていました。
Async in depth
ここまででasyncとtokioの一通りの機能に触れたのでfutureについてもう少し見ていきます。
まず指定された期間が経過したらstdoutに挨拶を表示するfutureを実装してみます。
use Future;
use Pin;
use ;
use ;
async
このコードはこんな感じに展開されるそうです。
use Future;
use Pin;
use ;
use ;
このコードをみて.awaitの挙動だったり、.awaitまたぐときの制約だったりがいろいろ腹落ちしました。futureはstate machinesだ、みたいな記述はこのことを言わんとしていたんですね。
async block内で.awaitを使うたびにenumで定義されたStateが増えていくことになるんですね。"zero cost abstractions"はだてじゃない。
Mini Tokio Executor
async block/fnがpollに変換されるということは誰かがこのpollを呼び出す必要があります。それがtokio(runtime)が提供しているexecutorです。
use channel;
use ;
use RefCell;
use Future;
use Pin;
use ;
use ;
use thread;
use ;
thread_local!
executorとしてMiniTokioを定義しています。MiniTokio::spawnで渡されたfutureをTaskでwrapしてchannelのSenderをcloneして保持させています。
Delayのfuture実装では別threadを起動して指定期間経過後にwakerをwakeします。
futures::task::ArcWake traitを実装してあると、impl ArcWake -> Waker -> Contextと作成できVTableを自分で作らなくてもpollできるようになるようです。
この実装ではwake_ by_refでは単純に自分を再度channelにsendしてexecutorのpoll対象になるようにしています。
ものすごく簡易的な実装だと思いますがExecutorの雰囲気はつかめたような気がします。 tokioのRuntime::block_onの実装をおってみたところまったくわかりませんでしたが(特にparkの概念)、以下のように確かにfutureをpollしているloopがありました。
https://github.com/tokio-rs/tokio/blob/c30ce1f65c5befb2a4b48eb4c16b7da3c0eafbd1/tokio/src/park/thread.rs#L263
loop
Select
tokio::spawnがgoroutineの生成に対応するならgoっぽく書けるんじゃと思っていましたがそのためにはひとつ重要な予約語がたりません。そうselectです。
これがないと最初のシグナルハンドリングがそもそも書けないです。(都度チェックする以外)
ということで、tokio::select!を見ていきます。マクロなのはしょうがないです。(cargo expandするとformatされていないコードが出力されてしまったので載せるのはあきらめました)
use oneshot;
async
ふたつのchannelの結果を待機するコードはこのようになります。
<pattern> = <async expression> => <handler>,
select!のsyntaxはこのように表されます。
すべての<async expression>はひとつにまとめられてconcurrentに実行され、あるexpressionの実行が完了して、結果が<pattern>にマッチすると<handler>が実行されます。
結果的に実行される<handler>は必ずひとつのbranchです。
また、<async expression>は同じtaskとして実行されるので同時に実行されることはない(はず)です。(task内concurrency)
このあたりの仕様はRustのborrow checkerに影響してきます。(handlerではmutable borrowをとれる)
これでsignalのような終了処理を伝播させるような処理もかけそうです。
use TcpStream;
use oneshot;
async
ただしtokioのsignal APIをみてみるとsignalの種類ごとにfutureを生成する必要がありそうなので複数種類のsignal処理を書く場合はそれぞれ生成しておく必要がありそうです。
use ;
async
Streams
駆け足でtutorialの各項目をさらってきましたが最後はStreamです。
Future<Output = T>がasyncなTだとしたらStream<Item = T>はasyncなIterator<Item = T>という関係のようです。(https://docs.rs/futures-core/0.3.6/futures_core/stream/trait.Stream.html)
I/Oと同様にfutures_core::stream::Streamはpoll_nextしか定義しておらず、iteratorのような各種APIを利用するにはtokio::stream::StreamExtを利用します。 redisのpub/subっぽいことをするコードは以下のようになります。(mini-redis-serverを起動しておきます)
use client;
use StreamExt;
async
async
async
subscriber.into_streamでsubscriberをconsumeしたあと、StreamExtを利用してadaptor処理を追加しています。このあたりの使用感はiteratorと同じですね。
注意が必要なのはnextを呼ぶ前にtokio::pin!という見慣れないマクロをよんでいることです。
nextを呼ぶためにはstreamがpinnedされている必要があり、into_streamはpinされていないStreamを返しています。このtokio::pin!を忘れるとものすごいコンパイルエラーメッセージとともに、pinする必要があるとコンパイラーから注意してもらえます。(.... cannot be unpined)
pinについてはasync bookやOPTiMさんのPinチョットワカルを参考にさせていただきました。
自分の理解ではPin<T>としておくとメモリ上で動かしてはいけない型(Futureを実装したstruct)が&mut selfをとれなくなり結果的にstd::mem::replace等が使えなくなり安全になるという感じです。
なぜFutureの実装をメモリから動かせないかというと.awaitをまたいた変数はstructのfieldに変換されるからとういことでしょうか。
終わりに
tokio tutorialを写しながら動かしていってだいぶasync/tokioのAPIに慣れてきました。まだまだはまりどころありそうですが少なくとも今まで書いていた同期処理のコードをtokioを使って書き直すとかはできるようなきがしてきました。また、Mini-Redisの実装では触れられなかったところでも参考になりそうな箇所が多くあり(protocolのparseのところ等)参考にしていきたいと思っています。
tokioのblog記事 Announcing Tokio 0.3 and the path to 1.0では2020年12月の終わり頃に1.0のリリースを計画していることが書かれています。
tokio1.0では
- A minimum of 5 years of maintenance.
- A minimum of 3 years before a hypothetical 2.0 release.
というstability guaranteesへのcommitが宣言されています。すごいですね、オープンソースで5年メンテします!と宣言しているものはかなり少ないのではないでしょうか。
これなら安心してtokio使っていけますね。
参考document
- async book. TODOが目立つがExecutorの簡易的な実装例がのっている。
- keen先生のblog.
RawWakrVTableからContextを作っている。 - 非同期Rustの動向調査
- OPTiMさんのtech blog
- Writing As OS in Rustのasync/awaitの章
https://github.com/tokio-rs/tokio/releases/tag/tokio-0.3.0
https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll