adtech studio

【Akka】Akka Streamsがめっちゃ便利すぎて脳汁が出た話し

By raita

Akka AMoAd Scala

こんにちは!Smalgoの來田です。

注意:タイトルは過激ですが内容至って普通なチュートリアル記事です。

仕事でWorkerを作った時に使ってみてめっちゃ便利だと思ったのでAkka Streamsについて書きたいと思います!

まだまだ中の実装の深いところまで追えてるわけじゃないので間違っていたら教えてください

 

Akka Streamsとは

Reactive Streams(ノンブロッキングでback pressureな非同期ストリーム処理の標準仕様)のAkka実装

 

Back Pressureとは

非同期なストリーム処理の場合下記の問題が起きる

  • Publisher側の処理が早い場合Subscriber側のバッファーが溢れてしまう
  • Subscriberに遠慮してPublisher側の処理を抑えた場合は無駄が多くなってしまう

それをSubscriberが自分が処理できる量をPublisherにリクエストを送ることで無駄なくSubscriberが処理できる量を処理する仕組みがback pressure(背圧制御)

 

Akka Streamsの使い方

今回使用したversion

 

まず一番簡単な例

結果

この例でやってることはソースを見てもらえればほぼわかると思いますが

1..5までのInt型数値をPublisher(Actor)が供給し、受け取った数値に2を掛けるStage(Actor)に渡され受け取った数値をprintするSubscriber (Actor)が最終的に受け取り結果を返す

もちろんこの時source側がフックでパッシングされるわけではなくsink側のdemand requestによって1,  2, 3, 4, 5の順に送られるので、もしsink側でものすごい時間がかかる処理があった場合でも問題なく動作が行われます(back pressure)

また、基本的にSourceから下流に流した値の処理の順序は保証される(Futureを使ったものに関しては保証させる、させないの2つ方法があります)

 

次はちょっとこった例

結果

一個前の例と同じ動作をします

処理の流れが直感的に見えるので個人的にはこちらの書き方の方が好きなのと、後述するBroadcastなどを使うならこちらの書き方が非常に使いやすいと思います

結果

このように簡単に処理をブロードキャストしマージするようなことも書けます

また、この結果でもわかるように処理の順序は保証されています

今回は触れませんがこの他にも型に合わせて順序を変えたり、値でfilterをかけるなどもデフォで用意されているので簡単に書けますし、ビジネスロジックなどをいれてオレオレ実装をすることも可能です

 

Akka Streamsの実用について

これまで簡単な例で説明してきましたが実際に使うときにはどのように使えばよいのかを、Workerを作るときにどのような構成で行ったかを踏まえて簡単に説明したいと思います

今回実装したWorkerの処理を超簡単に分類すると

  • Queueからデータを取得する
  • 取得したデータを使ってIDを生成
  • 生成したIDをRedisにストアする
  • 結果をロギングする

まず、Sourceに何を置くかですがこの内容だとQueueからデータを取得する部分をSourceに置けると思います

これがその時の簡単なPublisherの実装になります。ちゃんとやるならハンドリング処理とか色々追加しないといけない部分はありますが今回は割愛しています

この時新しくでてきたのはtotalDemandとonNextですね

まずtotalDemandは現在Subscriberが欲しがっているリクエストの数です(後述の設定によって代わります)、この数値以上のリクエストを送ろうとするとExceptionを吐き送ることはできませんのでチェックが必要です

onNextですがこれは言葉の通りで次の処理へ移行するという意味ですので次の処理へDataを渡したということです

 

次にSubscriberですが今回は結果をロギングするを置きましたがロギング処理自体もFlowに入れてしまっても良いと思います

簡単に実装するとこのように書くことができます

requestStrategyというのはどのタイミングでSubscriberにリクエストを送るかの設定で、自ら実装することもできますし用意されているWatermarkRequestStrategyなどを使っても問題ありません。ちなみにこのWatermarkRequestStrategyは設定したhighWatermarkの数までlowWatermarkの数を切った時にリクエストを送るというもので、今回の場合4, 2で設定しているのでtotalDemandが1になった時に4までリクエストを送るという意味になります

 

PublisherとSubscriberが決まったところで間の処理になりますが残った

  • 取得したデータを使ってIDを生成
  • 生成したIDをRedisにストアする

の2つを1つのStageにしてしまうよりIDを生成するStage, IDをRedisにストアするStageにわけたほうがテストもしやすくモジューラビリティもあがるので良いと思います

またStageに関しては普通の関数で実装することもできますし、Stageをextendsしてクラスにすることもできますが今回使ってみて、どうしようもないとき(ある特定条件では下流に流さないなど)以外は基本的に関数で行ったほうがテストもしやすくFutureを使った場合も安全に書くことができるので良いのではないかなと思っています

Stageに関してはこのようにPOJOで書くことができます

 

では最後にこの1つ1つの処理を1つのFlowにして実行したいと思います

このように簡単に1つのFlowを作成することができます

Futureを返す関数の場合mapAsync or mapAsyncUnorderedを使う必要がありますが違いは関数名の通り順序を保証したいならmapAsyncを保証しなくていいならmapAsyncUnorderedの方を使います。mapAsyncの場合Futureの1つが遅かった場合すべての処理に影響してしまうので特に順序に意味がないような処理ならばmapAsyncUnorderedの方を使ったほうが良いと思います

 

Akka StreamsのFault Tolerance 

Akka Streams内で何かしらのExceptionを吐いた場合、PublisherのCancel, SubscriberのOnError(t: Throwable)にリクエストを送りその後エラーが起きたActorはStopされる。そのためその後処理を継続したとしてもStopしたActorへはDead Letterになってしまうので動作が継続できません。

また、Akka Streams内のActorを管理しているSupervisorに関しても現状口が開いておらずカズタマイズできない(※M4でflow全体のRestartやStopなどが追加されるみたいです)ので下記2つの方法のどちらかを取る必要があるかと思います

  • Flow内ではExceptionを絶対に起こさないようにハンドリングする(recoverするなど)
  • Flow内でExceptionが起きたらすべてのActorを停止して、Graph毎実行し直す

このようにあまり良いFault Toleranceの方法はないのが現状です

 

まとめ

今回のようなWorkerを作るときなどに背圧制御などが基本必要だと思いますがそういったことを意識しなくてもAkka Streams側がよしなにやってくれるのは非常に便利だと思いました。また、今回は使いませんでしたがBroadcastやMergeなどの便利機能、直感的に動作を理解できるFlowGraph、Akkaで実装されているのですべてをノンブロッキングで実装すればパフォーマンスに関しても非常に高速と個人的には至れり尽くせりな感じを受けました

しかし、まだライブラリはexperimentalと実験的なものなので未知のバグなどもあると思いますので使用の際は自己責任かつ最後まで面倒見れる方のみ使用したほうが良いと思います

最後になりますがReactive StreamsのJava9への一部導入の話しなどもあったり使ってみてすごく便利だと思ったので今後流行るのではないかなと個人的には思っていたりします

以上ここまでみてくださってありがとうございました