Yahoo! JAPANエンジニアの福盛です。Yahoo! JAPANアプリの通知系バックエンドシステムを主に担当しています。
今回は、Apache Pulsarを使ったMessage Queue(以下、MQ)システムが実際のアプリケーションバックエンドでどのように活用されているかの例をご紹介したいと思います。
Apache PulsarについてはYahoo! JAPAN Tech Blogの記事「メッセージングPF「Apache Pulsar」の使い方(入門編)」でも紹介されています。こちらもぜひご参照ください。
通知システムでのApache Pulsarの活用
Yahoo! JAPANアプリ通知システムの全体像
Yahoo! JAPANアプリはiOS, Android端末を対象にPush通知を行っています。当初はニュース、災害情報の通知を中心に行うシステムでしたが、最近では社内外の各サービス、例えばヤフオク!、Yahoo!カード、PayPayなどに関する通知も行うようになっています。
これら各サービスとの連携を本格的に進めるためのシステム構築プロジェクトは2018年頃に始まりました。システムはScalaで実装されており、この頃から本格的に利用できるようになったApache PulsarとPaaS(Platform as a Service)の組み合わせで構築しています。
MQには入り口につながる”Producer”と出口につながる”Consumer”の2つのクライアントがあります。Yahoo! JAPANアプリ通知システムではProducerとConsumerは以下のような役割を持っています。
- Producer:APIや他のMQなどをデータソースにして「通知用MQ」にメッセージを送信する
- Consumer:「通知用MQ」からメッセージを受信し、通知送信や「お知らせ」画面表示データの書き出しを行う
各サービスからのリクエストは「荷物が到着した」「質問への返信が到着した」といったリアルタイムイベントとして、あるいは「月の利用料のお知らせ」などの定期イベントとして、APIや他のMQを経由してProducerにより送信されます。MQのメッセージは
- 社内プラットフォーム(Push Platform)を使い通知を送信するConsumer (Push Consumer)
- 「お知らせ」画面表示のデータをDB(Apache Cassandra)に書き出すConsumer (List Consumer)
に届けられ、Yahoo! JAPANアプリからの通知と「お知らせ」画面の表示に使われます。
ちなみに、List ConsumerからApache Cassandraへ書き出されたデータがどのようにして「お知らせ」画面に反映されるかについては、昨年のYahoo! JAPAN Tech Blogの記事「チームのスキル向上にもつながるシステム刷新 〜 Yahoo! JAPANアプリ「お知らせ」機能の開発事例」でも紹介しました。もし興味があればこちらの記事も参照ください。
ScalaでApache Pulsarを使う
Apache Pulsarにはさまざまな言語向けにクライアントライブラリが提供されています。ScalaからはJava用のライブラリを利用することもできるので、それを直接使ってもよいのですが、ラップしたライブラリを用意し、これを使っています。
// 初期化コードでProducerを構築
val srPulsarClient = Client(srPulsarConfig, pulsarAthenzAuthConfig)
...
// 送信用messageをProducerから送信
srPulsarClinet.producer.send(message)
// 初期化コードでConsumerを構築
val pulsarClient = Client(pulsarConfig, pulsarAthenzAuthConfig)
val consumer = pulsarClient.consumer(
"push-consumer",
SubscriptionType.Shared,
pulsarClient.consumerBuilder.ackTimeout(PulsarAckTimeout, TimeUnit.SECONDS)
)
...
// Consumerからメッセージを取得し処理を行う。
consumer.foreach { message =>
Try {
// メッセージごとの処理はprocessMessageメソッドに渡され、そこで処理される
processMessage(message)
}.recover { ... }
consumer.acknowledge(message)
}
Message Queueのメッセージフォーマットについて
MQ上を流れるデータを送ったり受け取ったりするにはメッセージフォーマットがきちんと定義されていることが必要です。
Yahoo! JAPANアプリ通知システムでは、通知送信とお知らせ画面の表示の両方をカバーするようなフォーマットをProtocol Buffers形式で定義しています。設計時にメッセージフォーマットとしてJSONを利用することも検討していましたが、送信の効率およびメッセージフォーマットの管理の観点から、最終的にこちらを選択しました。
定義のイメージは例えばこんな感じです。
...
/* PushRequest represents a requesting message of a push notification. */
message PushRequest {
enum UserIdType {
...
}
enum Behavior {
PUSH_AND_LIST = 0;
LIST_ONLY = 1;
PUSH_ONLY = 2;
}
enum OsType {
ALL = 0;
IOS = 1;
ANDROID = 2;
}
...
repeated string user_id = 1;
optional UserIdType user_id_type = 2;
optional string service_name = 3;
optional Behavior behavior = 4;
...
これをprotoc
コマンドでJavaのクラスに変換し、コード内で利用しています。
アプリケーションログを活用して状況把握
Webアプリケーションなどと違い、クライアントからのリクエストやレスポンスを確認できないため、MQを使ったアプリケーションは中で何が起こっているかを把握するのが一見難しいように見えます。しかしアプリケーションログを積極的に出力し、これを確認することでMQによるアプリケーションでも動作の様子を確認しながら開発、さらにリリース後の動作状況の把握、トラブルシュートが容易にできるようになります。
例えばMQからのメッセージを受信した後、その内容をアプリケーションログに出力するにはこのように書きます。
class Main(
...
) extends Logging {
...
def parse(m: Message[Array[Byte]]): Try[PushRequestMessage] = {
...
val pushRequestMessage = PushRequest.parseFrom(m.getData).toScala
logger.info("message parsed. pushRequest={}", pushRequestMessage)
...
アプリケーションログには以下のような内容が出力されます。(イメージ)
time:2020-12-07T16:29:07.652+09:00 level:INFO (...) message parsed. pushRequest=PushRequestMessage(...,Some(PushAndList),Some(serviceName),Some(メッセージが届きました),Some(https://...),Some(https://...),...)
アプリケーションログの確認、検索にはSplunkを活用しています。これについてはぜひ別の機会に紹介したいと思います。
MQの配信速度調整で後段の負荷をコントロールする
ここでPulsarの機能を活用した応用的な活用例を一つ紹介します。
Push通知はタップするとアプリ画面に移動します。移動先は連携先サービスによってさまざまですが、通知は数百万から数千万人を対象に発行されることがあるため、短時間で多くのユーザーに通知が届くと、そこから大量のアクセスが発生する場合があります。連携先によってはアクセスが集中するとその負荷に対応できないといった問題も発生します。
このような状況を防ぐには、Pulsarの配信速度調整機能が有効です。
通知システムでは高速・低速のキューを用意し
- 遷移先の処理能力が十分にある場合=高速
- 遷移先の処理能力に制約がある場合=低速
と使い分けています。
低速キューではリアルタイム性がそれほど要求されないが配信数が数千万人向けとなるメッセージを数時間かけて送信します。この例では、11時からお知らせの配信を開始していますが、メッセージはキューにいったん格納され、その後17時までかけてゆっくりと消化している様子を見ることができます。
油断大敵! トラブル事例
メリットも多く、通知システムの中心的役割を果たしているMQですが、トラブル発生時の対応には特に注意が必要です。
かつてあった事象として、MQ上のメッセージのうち1件が想定外のエラーで受信処理を完了しなかった(が、その後のメッセージは正常に処理されている)状況となった際、MQのキューの容量だけが消費され、その後時間をおいて新規メッセージの受け付けができなくなるというものがありました。
最終的に、
- コード内にメッセージ処理時の想定外エラーへの対処漏れがあったので、これを修正(主原因)
- MQのキューの容量を監視し、一定量以上の状態が続いた場合にはアラートを出す
という対応を行いました。以後は安定した運用を1年以上継続しています。
さいごに
以上、Apache Pulsarを使ったMQのScalaでの利用とその活用についてご紹介しました。
これまでの2年弱の運用を通じて「システムの疎結合化」「大規模なトラフィックへの対応」にMQが大きな威力を発揮していることを日々実感しています。開発・運用でのアプリケーションログの活用など、今回書ききれなかったトピックもありますが、これらについてはぜひ別の機会にご紹介したいと思います。それでは、また!
こちらの記事のご感想を聞かせください。
- 学びがある
- わかりやすい
- 新しい視点
ご感想ありがとうございました
- 福盛 秀雄
- エンジニア