最近会社でThis week in rustならぬThis week in fraimという会が週1で行われる様になりました。そこでCQRS and Event Sourcing using Rustというドキュメントを教えてもらいました。
このドキュメントはcqrs-rsというRustのCQRS lightweight frameworkについて書かれており、RustにおけるCQRSの実装の具体例としてとても参考になりました。
そこで本記事では、cqrs-rsとデモ実装であるcqrs-demoのコードを読んでRustにおけるCQRSの実装を追っていきます。
用語
本記事で利用する用語については以下の意味で利用します。
用語 | ここでの意味 |
---|---|
CQRS | application(domain)のmodelをwriteとreadに分けてるくらいの意味です。それぞれの永続化に利用するbackendも異なる場合もあります。 |
Aggregate | * DDDにおける集約のRoot * 関連するEntityとValueObjectの集合。実質Entity * 変更はAggregateに対してのみCommandを適用する形でのみなされる |
Command | Aggregateに対する作成/更新のリクエスト |
Event | Aggregateに対してCommandを適用した結果の表現 |
概要
全体の登場人物と処理の概要を説明します。
- CommandHandlerはRESTのhandlerやGraphQLのresolverのように外部から処理のリクエストをうけるレイヤーです。cqrs-esはこのレイヤーに特に関与しません。
- EventStoreはwriteの永続化処理を担うコンポーネントです。EventStoreが処理対象のaggregateに関するeventをloadしてきます。
- 永続化層から取得したeventsをaggregateに適用します。これによりaggregateが最新の状態になります。
- 作成/更新を表現したCommandを適用します。
- 成功した場合は新しいeventsが生成されます。
- 生成されたeventsを永続化します。同時に発せられたcommandのようなreadModifyWrite(conflict)の処理はここで行います。
- 永続化が成功したeventを関心のあるコンポーネントに届けるためにdispatchされます。
- 一般的にはwriteに対応するreadのmodelを更新する処理が行われます
- ClientからのReadの要求に対して更新されたviewを利用することで、commandの適用結果がみえるようになります。
以上が処理の概要になります。cqrs-esはこの処理の流れを提供してくれます。application全体に組み込まれるというよりaggregate(domain model)単位で利用できるところがlightweightと言われているところなのかなと思います。
Aggregate
cqrs-demoで利用されるaggregateはBankAccountで識別子のaccount_id
とcommandによって変化するbalance
をもっており、balance
の変更にはドメインのルール(0以下にならない等)が適用されます。
Command
BankAccount
aggregateに対する変更要求です。
作成とbalanceを変更するcommandが定義されています。
enumを使うことがapiから強制されます。
Event
BankAccount
aggregateに対してcommandを適用した結果起きるeventです。
ここではcommandと発生するeventが1:1になっていますが、commandに対してeventが発生しない場合や複数起きる場合もあります。
1 execute
// pub type PostgresCqrs<A> = CqrsFramework<A, PersistedEventStore<PostgresEventRepository, A>>;
async
axumのhandlerでcommandをdeserializeしてaggregate(bank account)のcqrs instanceを呼び出すコードです。 リクエスト時にaggregateの識別子(aggregate_id
), command(BankAccountCommand
), cqrs instance(PostgresCqrs<BankAccount>
)があればよいです。
MetadataExtensionはrequestからUseragentや現在時刻を付加する情報です。
write層で永続化されるのはあくまでeventですが、auditや運用の観点から永続化したい情報(処理時刻やclient ip等)はmetadataとして表現します。
3 apply
use Aggregate;
Eventをaggregateに適用して、状態を更新する。エラーは返せないので絶対に成功する。 cqrs frameworkが過去のeventでapplyを呼んでくれるのでaggregateは最新の状態になる。
また、各aggregateはcqrs_es::Aggregate
を実装することが要求され、その中でCommand, Event,Error, Servicesをそれぞれassociate typeで指定する。
CommandとEventはこれまで述べてきたもので、Errorはcommand適用時に発生するエラー。ServicesはCommand適用時の依存(api call等)。
4 handle
最新のaggregateに対してcommandをhandleすることでeventを算出する処理。引数は&self
なのでaggregate自身の状態は更新できず更新はeventで表現する。
CQRSFramework Components Diagram
1aggregateを担当するCqrsFramework
のcomponent間の概要。
- ユーザが実装するのは
DynamoEventRepository
PosgresEventRepository
といった永続化層からevent(SerializedEvent)のrecord/itemを取得する処理- Dynamo, Postgres, MySqlの実装はcqrsの別リポジトリで用意されている。
Query
commitされて永続化に成功したeventを反映させる処理
- cqrs instanceの初期化処理
CQRSFramework execute_with_metadata Sequence Diagram
ユーザはaggregateに対してのapplyとhandleを定義すればあとの取り回しはframework側が行ってくれる。
ただし、EventRepositoryの実装でeventのconflictを処理する必要がある。
SerializedEvent
https://github.com/serverlesstechnology/cqrs/blob/master/src/persist/serialized_event.rs#L11
各種Eventを直接serializeするのではなく、SerializedEvent
という中間表現にして取り回す。
したがって、writeの永続化ではmodelごとにtable(collection)を分けるのではなく一つのtableですべてのmodelのeventを永続化できる。(分けようと思えば分けれる)
event_upcast
pub
// impl example
/*
{
"UpdateAddress": {
"address": "912 Spring St",
"city": "Seattle",
"state": "WA"
}
}
*/
let upcaster = new;
https://github.com/serverlesstechnology/cqrs/blob/master/src/persist/serialized_event.rs#L61
https://doc.rust-cqrs.org/advanced_event_upcasters.html
applicationを運用していく中でEventにfieldを追加したくなる場合の機構。enumなので、UpdateAddress,UpdateAddressV2のようにvariantを増やすこともできるがそうではなく、SerializedEvent
をdeserializeする際に直接値を加工するレイヤーが用意されている。
このあたりの考慮はCQRS独自のものだと思いました。
events table
(
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
sequence bigint CHECK (sequence >= 0) NOT NULL,
event_type text NOT NULL,
event_version text NOT NULL,
payload json NOT NULL,
metadata json NOT NULL,
PRIMARY KEY (aggregate_type, aggregate_id, sequence)
);
具体例としてSerializedEvent
を永続化する場合のDDL。
aggregate_type, aggregate_id, sequenceに対してprimary key(=一意性制約)が貼ってあるので、競合するeventの書き込みがエラーになる。
このように永続化層に一意性制約が付与できれば実装はとても楽になる。
デモではPostgresの一意性制約違反をそのまま利用していた。
Eventsの書き込み競合をPostgresの機能で解決しているのでCqrsFrameworkとしてはEventRepositoryの実装に競合の解決を委ねている。
EventEnvelope
https://github.com/serverlesstechnology/cqrs/blob/master/src/event.rs#L61
Eventの永続化が成功すると各Queryに渡されるEvent。metadataやsequenceを渡すためにEventEnvelope
としてwrapされている。
HashMap<String,String>
なので何が入っているか型で表現できない。このあたりがopinionatedなところなのかなと思った。
Cqrs init
https://github.com/serverlesstechnology/cqrs-demo/blob/master/src/config.rs
わかりにくいが、cqrs instanceの初期化処理。viewとqueryで同じ実装を利用しているので同じ処理でconstructしている。
GenericQuery Component Diagram
Aggregateを更新した後に対応するviewを更新する処理は一般的なフローなので、そのための汎用的な処理はGenericQuery
として提供されている。
GenericQuery dispatch Sequence Diagram
ここでもユーザはviewの取得と更新処理、永続化をそれぞれ実装しておけばよい。
難しいのが、Query::dispatch
はエラーを返せないので、viewの更新に失敗してもcommandの処理自体は完結しているので、retryしない限りwriteとreadで整合性がとれなくなってしまう。
Test
ドメインのルールはEventの適用とCommandの処理に集約されているので、過去のeventsと現在のcommandに対して出力されるeventsの形ですべて定義できる。そのためのtest helperも用意されていた。
- 過去のevent, serviceのmock, 適用されるcommandから期待通りのeventが返されるかのtestがunit testで書ける。
- 外部サービスの呼び出しはmockを書く必要はある
cqrs_es::test::TestFramework
が用意されているので、eventのapplyやcommandのhandleまわりを書かなくてよい。
わかっていないこと
実装上はSnapshot関連の記述があるものの、demoのユースケースではでてこなかった(と思われる)ので、どのように活用するか理解できなかった。
余談
SourceOfTruthという型名がかっこいいと思った
https://docs.rs/cqrs-es/latest/src/cqrs_es/persist/event_store.rs.html#13
自分で試した感想
- Frameworkといいつつ、application全体を制御するものでないので割と薄い
- Read(view)に関してはかなり自由。
- readModifyWrite(event更新の衝突)については、aggregate_type,aggregate_id,sequenceに一意制約付与できればよいのでそれができればわりと実装は簡単そうだった。
- Aggregate関連はgenericになっているが、trait object(dyn)もでてくる。
- metadatadateに関してはHashMap<String,String>
- writeのmodel(aggregate)に関しては変更がしやすい。fieldの型を変えてもapplyで対応すればよい
- Eventはdeserializeする必要があるので互換性のない変更はしにくい(そのためのevent_upcaster)
- writeとviewで実質modelの定義が二つでてくる。
- メリットでもデメリットでもあると思う。viewのときだけ欲しい追加の情報のせたりできるが、大抵はwriteとread両方にはねるとおもう。
- query dispatchでエラーになってもwriteの処理は完了するので、適切にretryする必要がありそう。
- がんばった分、command → apply → resultant_event の処理にfieldの更新ロジックが集中するのでよいと思った
まとめ
CQRSは概念としてしか知らなかったのでRustでの具体的な実装を知れてうれしかった。
Aggregateにcommandを適用して結果をEventとして表現し、これをSourceOfTruthとして永続化するという発送はシンプルでわかりやすいと思った。
こうなってくると運用してみたいので、CQRSの設計に関する本も読んでみようと思った。(例えばThe Art of Immutable Architecture)
参考
- CQRS and Events Sourcing using Rust
- cqrs-demo
- cqrs-es
- github repository名がcqrsだが、cargo(package)はcqrs-es
- https://github.com/serverlesstechnology/postgres-espostgresのPersistedEventRepositoryの実装