ヤフー株式会社は、2023年10月1日にLINEヤフー株式会社になりました。LINEヤフー株式会社の新しいブログはこちらです。LINEヤフー Tech Blog

テクノロジー

StormとKafkaによるリアルタイムデータ処理

こんにちは、鈴木カズです。

社内向けの監視システム構築のため、StormやKafkaを利用して開発を行っていました。
そのときの経験をもとに、まずStormによる実際のシステムがどんなものかということを紹介し、KafkaSpoutの処理内容、カスタマイズ方法、Stormのメッセージ処理などを説明したいと思います。

読者としては、StormやKafkaについて興味があり記事を読んだりしたことがあるがもう少し具体的な話を知りたい方、これから開発予定があるような方を想定しています。

StormとKafka

Stormは簡単に言うと、リアルタイムに流れてくる大量のデータを処理するための分散システムです。Twitterのメッセージの分析などに使うために開発されたようです。似たようなシステムとしてはS4やSparkがあります。大量のデータを処理するシステムというとHadoopが有名ですが、Hadoopは基本的にはディスクアクセスをベースとしたバッチ処理をするシステムで、リアルタイム処理には向いていません。

概論についてはインターネット上にいろいろスライドがあるので、こちらなどを参照してください。開発元で提供しているドキュメントは英語になりますが、こちらになります。

本稿では省略していますが、StormにはTridentという高レベルの抽象化をおこなった仕組みもあり、ジョイン、アグリゲート、グルーピング、ファンクション、フィルタといった高レベルの機能を利用して処理を行うことができます。

Kafkaはログなどの大量データをPub/Subモデルにより高速に中継するメッセージングシステムです。こちらも概要はスライドなどを参照してください。公式ドキュメントはこちらになります。

Kafkaはconsumerと呼ばれるクライアント側からデータをpullする設計になっていて、Stormと相性が良いため、両者は組みあわせて使われることが多くなっています。ただ、必ずしもKafkaの必要はなく、KestrelやRabbitMQなど他のデータソースと連携するSpoutも開発されています。

Stormでの処理内容

サーバー監視システムは、社内のサーバーからログやメトリック情報をFluentdを利用して収集し、いったんKafkaに貯めます。StromはこのKafkaに保存されたログを取得し、順次処理していきます。

Stormの処理の内容は簡単に言うと次のようなものです。

  • 異常判定ルールが流れてきたらDBに登録する
  • サーバーからのログやメトリック情報が流れてきたら異常判定ルールと比較する
  • 異常判定がされたら外部システムに通知する

規模感としては10万台オーダーのホストからログやメトリック情報を収集できる監視システムを目指しています。

Stormによるメッセージ処理の概略

Stormではspoutとboltというコンポーネントを組みあわせてトポロジーを構築します。

spoutは外部システムからメッセージを取得し、メッセージを処理するのはboltの役目になります。今回はboltを2種類作成しています。トポロジーではデータの流れる順序やデータの分配方法を指定でき、今回は下記のような構成にしました。

トポロジー

メッセージ配送先のコントロール

Stormではメッセージの分配方法がいくつかありますが、代表的なものは次の二つだと思います。

  • Shuffle grouping
    配送先のboltをランダムに選択します。
  • Fields grouping
    フィールドの値に応じて配送先のboltを決定します。

流れるデータの性質とその処理内容によって配送方法を選択する必要があります。ランダムにどのようなデータが来ても構わない場合はshuffle groupingを、データの局所性があり、同じキーを持つデータは特定のスレッドで処理したほうが都合が良い場合、そのキーをフィールドに設定してfields groupingを利用します。

できるだけオンメモリで処理するには一つのスレッドに保持するデータ量を減らす必要があります。ボルトを分割するのは、処理に必要な情報を限定できるようにすることが目的の一つになります。

一般的なプログラム開発では、単体のプロセスで処理を完了するものがほとんどだと思います。Stormで開発する場合、サーバー間で効率的に分散して処理を完了させるためには、どのように処理を複数のボルトに分割させればいいかを慎重に検討することが重要になります。

なお、図には記載されていませんが、実際には外部データベースとして、Hbaseも利用しています。定期的にメモリ上の情報をHbaseに保存し、プロセスが異常終了した場合でも処理を継続できるようにしています。

KafkaSpout

データの取得元はKafkaになるので、Stormと同じGitHubレポジトリにあるstorm-kafkaというモジュールを利用しています。以下の説明では、すべてstorm-0.9.2-incubatorというバージョンを前提にしています。

Kafkaのバージョンに注意

面倒なことに、Kafkaのバージョンが0.8でAPIが変更されました。このため自分が開発をはじめたころstorm-kafkaはkafka-0.8には対応していないという時期で、kafka-0.8を利用するにはstorm-kafka-0.8-plusというパッケージを使う必要がありました。今はStormとおなじGitHubのレポジトリでstorm-kafkaが管理されていて、これがkafka-0.8以降に対応しています。kafka-0.8以降を使っている場合は問題ありませんが、古いバージョンのKafkaを使っている場合は注意してください。

KafkaSpout概略

storm-kafkaで提供しているSpoutがKafkaSpoutです。

KafkaSpoutが接続するKafkaはトピックという単位でメッセージを管理します。主に性能向上のため、トピックは複数のパーティションに分割して持つことができます。冗長性を確保するためのレプリカ数も指定することができます。トピックの最新メッセージのオフセット情報はトピックのパーティションごとにZookeeperに記録します。Zookeeperにはトピックにパーティションがいくつあり、どのパーティションをどのサーバーが管理するかといった情報も保存されています。ZookeeperはStormやKafkaやHbaseのような分散システムで情報を共有したり、どのサーバーが生きているか管理するために利用できるシステムです。

Kafkaからデータを取得するためにはKafkaSpoutはまずZookeeperに接続しこれらの情報を取得します。KafkaSpoutはデータを取得し、boltにtupleを送信し、ackやfailを受けてどこまでデータを処理ずみであるかというオフセット情報をZookeeperに保存します。Zookeeper上のオフセット情報を更新するのは、KafkaSpoutがデータを取得したときではなく、トポロジー全体でデータの処理が完了した後になり、Kafkaからデータを取得したが途中で処理に失敗した場合にそのデータが未処理のままになるということはありません。

トポロジー内でメッセージの処理が時間内に完了しなかった場合や処理が途中でfailされた場合、そのメッセージはKafkaSpoutにより再送信されます。

KafkaSpoutのインターフェース

簡単にKafkaSpoutの処理を見ておきます。

ソースはこちらになります。

open()

ZookeeperからKafkaとのコネクション情報を取得します。

nextTuple()

Kafkaからメッセージをfetchして、boltにtupleをemitします。

KafkaSpoutでは_pendingとfailedというキューでメッセージを管理します。

nextTupleが呼ばれたとき、_pendingキューが空であれば、fill()という処理が呼ばれ、ここでfailedキューの内容が先頭に挿入され、Kafkaからメッセージをfetchします。_pendingが空でなければfill()は呼ばれず処理を続けます。基本的には_pendingからメッセージを一つづつ取りだしてemitします。

ConfigのmaxOffsetBehindで指定した数よりfailedのキューが増えたり、Kafkaの最新のオフセットから離れると、failedを無視し、Kafkaの最新オフセットからメッセージを取得するようになるので注意してください。デフォルトは10万になっています。

Kafkaからメッセージを取得する際はSpoutConf.setMaxSpoutPending()で設定した数だけfetchします。この数を増やせばKafkaへのアクセス頻度が減るため性能改善につながりますが、failedのキューが処理されるのはpendingキューに設定されたメッセージが空になったタイミングなのでfailedの処理頻度が低下する点に注意してください。

fail()

boltからfailが返されたときや、tupleの処理がtimeoutした場合にmsgidを引数に呼ばれます。再処理用にメッセージをfailedキューに入れます。

ack()

すべてのboltからackが返ったときにmsgidを引数に呼ばれます。pendingキューからメッセージを削除します。

commit()

nextTuple()の中やdeactivate()から呼ばれます。ただし前回更新時より指定した時間(デフォルトでは2秒)以上経過したとき。Zookeeperに書いたoffset情報(Stormでどこまで処理したか)を更新します。

MultiSchemeのカスタマイズ

KafkaSpoutが作成するtupleの中のフィールド情報は独自のScheme実装で調整しています。

Storm/SpoutでSchemeの役目はデータ取得元から取得したメッセージをどのようにStormに渡すかを規定することにあります。Stormはこの情報を利用してtupleの作成をおこなっています。

public interface Scheme extends Serializable {  
  public List<Object> deserialize(byte[] ser);  
  public Fields getOutputFields();  
}

deserialize()でバイト列の展開方法を実装し、getOutputFields()で展開されるフィールドの数や名前を伝えます。

今回のシステムではKafkaから取得したメッセージを分解して、フィールドを二つ持つようにする必要がありました。具体的には元のメッセージからキーにしたい文字列を取り出してフィールドを追加するようにdeserialize()とgetOutputFields()を実装しています。これにより、spoutでFields shuffling用のフィールドを追加することができます。StringKeyValueSchemeなどキーバリュー形式のメッセージを処理するSchemeもstorm-kafkaに入っていますが、Kafkaに投入するところから同じSchemeを利用しなければならず、既存のデータをそのまま使いたい場合には使えなかったのです。

YourOwnSchemeというSchemeを作成したとすると、spoutのConfig設定時に下記のように指定することで有効になります。

spoutConf.scheme = new SchemeAsMultiScheme(new YourOwnScheme());

Storm のメッセージ処理

Stormのメッセージ処理の公式なドキュメントはこちらになります。Stormメッセージ処理

ここではメッセージの処理あるいはタイムアウト関連で気がついたことなどをメモしておきます。

二つのメッセージング機構

プロセス間のメッセージ通信はNettyというライブラリを利用しています。以前はZeroMQというライブラリを利用していましたが、Nettyのほうが高速だということでデフォルトに変更されました。

プロセス内(スレッド間)のメッセージ通信はDisruptorというライブラリを利用しています。ドキュメントをざっと読んでみましたが、非常に高速なメッセージ通信を実現しているようです。

メッセージングに関するバッファサイズの調整

メッセージ通信に関してバッファサイズのチューニングについては、こちらの情報を参考にしました。

タイムアウトを検出するspout/boltの条件

spoutでtupleをemitするときに何らかのIDを一緒に渡す必要があります。
KafkaSpoutを使っていれば問題ありませんが、サンプルのspoutを作ったらタイムアウトしなくてびっくりした覚えがあります。

公式ドキュメントにも書いてあるのですが、なんらかの形で作成したIDをパラメタに付与する必要があります。

_collector.emit(new Values("field1", "field2", 3) , msgId);

Spoutのsampleプログラムにはタイムアウト処理を考慮する必要がないせいか、msgidを渡していないものが多いので、注意してください。

boltからemitする場合、最初にexecute()に渡されたtupleをemitの最初の引数に渡すことが必要です。これはanchoringと呼ばれています。逆にanchoringしなければこれ以降タイムアウト監視がされません。

_collector.emit(tuple, new Values(word));

タイムアウトの計測

Spoutがtupleをemitしてから、tupleのIDごとに経過時間がチェックされます。必要なすべてのackをタイムアウト時間までに受けとれなかったtupleはタイムアウト判定されます。個々のboltでの処理時間が短かくても、トータルの処理時間が長くなってタイムアウトすることがあるので注意が必要です。

タイムアウトを無効にする

topology.enable.message.timeouts: false
に設定すれば良さそうに思うのですが、自分の環境では無効にはできませんでした。

Stormのドキュメントに記載がある下記の方法でタイムアウトは無効にできるはずです。

  • Config.TOPOLOGY_ACKERSを0にする
  • Spoutでemitする際にmsgidを渡すのをやめる

タイムアウトしたメッセージをどう処理するか

個人的にStormで怖いのは、何らかの要因で処理が遅延してタイムアウトが頻発してしまうことです。サーバーのパフォーマンスは常にチェックすることは当然としても、処理の遅延が継続しないように、緊急時には例えばプライオリティに応じて何らかのスキップ機構を準備しておくことが必要だと思います。タイムアウトのときにどのような挙動になるかを把握するために、わざと時間がかかるboltを作成して動作をトレースしてみることをおすすめします。

秒間数万を超えるメッセージの処理が必要な環境では、現実問題としてタイムアウトしたメッセージを必ずしも対処できるかわかりません。タイムアウトメッセージが発生するのが十分短い時間であれば問題は少ないですが、大量にメッセージを再処理することになった場合、システムの処理能力を超えてしまうことも考えられます。最悪の場合、タイムアウトが連鎖して、まったく処理が進まなくなる事態に落ちいります。失敗したメッセージについてどこまで処理するか、十分に検討する必要があるでしょう。

タイムアウト時にどう対応するかは、spoutのfail()の実装に依存します。
KafkaSpoutではfail()したメッセージをキューに入れるので、再送されています。このあたりの挙動を変更したい場合はKafkaSpoutソースに手を入れるか、Stormでのタイムアウト判定を無効にして、独自にタイムアウトを判定し、タイムアウトしたメッセージの処理を別途組みこむ必要があります。

メッセージの順序性

トポロジーの設計でshuffle groupingを利用した場合、メッセージはランダムなboltで処理され、それぞれのboltでの処理の速度は予測できないため、メッセージの処理順序は保証できなくなります。また、メッセージの処理がタイムアウトしたり、failした場合には再送されますが、その場合はメッセージの順序が大きくずれてしまいます。メッセージの順序性を維持するにはコストがかかることを覚悟する必要があります。

メッセージの重複処理

KafkaSpoutでは処理したメッセージのオフセットをZookeeperにcommitするのはデフォルトでは2秒ごとのため、この間にプロセスが異常終了した場合は処理ずみのオフセット情報が失われ、同じメッセージを二重に処理する可能性があります。オフセット情報の更新頻度を上げることも可能ですが、その場合はZookeeperの負荷も上がり、全体の処理速度にも影響を与えるため、注意が必要です。

最後に

大規模なデータ処理というとまずはHadoopが思い浮びますが、最近はBigQuery、Spark、Storm、Imparaなど、より高速でリアルタイム性を持つようなシステムが注目を浴び、利用されるようになってきました。本稿が読者の何らかの役に立てれば幸いです。

こちらの記事のご感想を聞かせください。

  • 学びがある
  • わかりやすい
  • 新しい視点

ご感想ありがとうございました

このページの先頭へ