はじめに
こんにちは。システム統括本部プラットフォーム開発本部の栗原と申します。
2016年9月、Yahoo! Inc.からPulsarがOSSとして公開されました。Pulsarは高い配信信頼性/パフォーマンス/スケーラビリティを誇るPub-Subメッセージングシステムです。Yahoo! Inc.において開発され、これまでYahoo! Inc.のメール、ファイナンス、スポーツなど主要サービスにおいて、メッセージを高速かつ消失することなく配信するために利用されて来ました。
Pulsar開発者によって書かれた技術ブログ:Open-sourcing Pulsar, Pub-sub Messaging at Scaleによるとその規模は
- レプリケーションされるデータセンター数:10以上
- 一日あたりに生成されるメッセージ数:1000億以上
- Topic数:140万以上
と巨大でありながら、メッセージ生成の平均レイテンシは5[ms]以下を実現しています。
本稿ではこのPulsarの仕組みや特徴について紹介します。
Pub-Subとは
Pulsarは「Pub-Subメッセージングシステム」と述べましたが、まずはそれがどのようなものか説明したいと思います。
Pub-Sub(Publish and Subscribe)はメッセージ送受信のモデルの一つで、一般的に広く使われています。
図1. Pub-Subシステム
Pub-Subシステムにおいて、メッセージ送信/受信を行うクライアントのことをそれぞれPublisher/SubscriberまたはProducer/Consumerと呼びます。本稿ではProducer/Consumerの用語を使うこととします。
ProducerはTopicと呼ばれるエンドポイントに対してメッセージを投稿します。Consumerは購読しているTopicにメッセージが新たに投稿されると、それを受け取ります。ここでProducer対TopicおよびConsumer対Topicはどちらも多対多の関係になっています。
例えば図1において
- Producer1はTopic1に対してm1というメッセージを投稿し、それがConsumer1, Consumer2それぞれに配信されます。
- Producer1はTopic2に対してm2というメッセージを投稿し、それがConsumer2に配信されます。
- Producer2, Producer3はTopic3に対してそれぞれm3, m4というメッセージを投稿し、その両方がConsumer3に配信されます(このときm3, m4のどちらが先に配信されるかはそれぞれの生成時間やPub-Subシステムの実装によります)。
ここでProducerのメッセージ送信とConsumerのメッセージ受信は必ずしも同期的に行われる必要はありません。典型的には投稿されたメッセージを一時的に何らかのキューに入れておき、Consumerが接続されたタイミングでそれを配信します。またProducerはメッセージ送信先のTopic名のみを知っていればよく、どのConsumerがそれを受信するかを知る必要はありません。同様にConsumerはメッセージ受信元のTopic名のみを知っていればよく、どのProducerがそれを送信したかを知る必要はありません。
以上のようにPub-Subシステムには
- ProducerとConsumerが非同期にメッセージをやりとりできる
- ProducerとConsumerが疎結合になっており、それぞれの追加や拡張が容易である
という性質があります。そしてこれらの性質はProducer/Consumerの数が多い大規模なシステムにおいて非常に有効に働きます。
用語の定義
次に、Pulsarの中で用いられるいくつかの用語を定義しておきます。
Cluster
物理的あるいは論理的に区切られたPulsarのシステム単位です。典型的には地域(リージョン)ごとにClusterが存在します。例えば東京、大阪、名古屋にそれぞれ東京Cluster, 大阪Cluster, 名古屋Clusterがあるようなイメージです。
Property
Pulsarにおける利用者の識別子です。ここでの「利用者」とはPulsarを利用するクライアント(Yahoo! Inc. の例ではメール、ファイナンス、スポーツなどの各アプリケーション)のことを指します。Pulsarはマルチテナント型のサービスとして設計されており、一つのClusterに複数のPropertyが同居できるため、PropertyごとにClusterを立てる必要はありません。
Namespace
Property内におけるTopicの管理単位です。利用者は自分のPropertyの下に任意に複数のNamespaceを作成し、それぞれに対して権限などの設定を持つことができます。
Topic
既に説明した通り、メッセージが送受信されるエンドポイントのことをTopicと呼びます。利用者は自分のNamespaceの下に任意に複数のTopicを作成できます。Pulsarにおいて、Produce/Consumeする対象のTopicを指定する際には、次のような形式で指定します:
persistent://property/cluster/namespace/topic
property
, cluster
, namespace
の部分にはそれぞれ上述したProperty, Cluster, Namespaceの名称が入ります。persistent
の部分はメッセージの配信ポリシーを表しています。2016年9月時点では
persistent
:そのTopicを購読しているすべてのConsumerからACK(Acknowledgement:メッセージの受け取りが正常に終了したことを伝える応答)が返されるまでメッセージを保持する方式
のみがサポートされていますが、将来的には
non-persistent
:メッセージを確実に届けることよりも速度を重視し、ACKの返却を必須にしない方式
のサポートも検討されています。
Subscription
ConsumerがTopicの購読を開始する際に作成されるリソースで、Cursor:メッセージをどこまで読んだかの情報を持っています。また、メッセージの受け取り方を表すSubscription Typeというものがあり、Exclusive, Shared, Failoverの3つの内から指定できます。各モードの詳細についてはsubscription-modesを参照してください。
システム構成
続いてPulsarのシステム構成について説明します。
図2. Pulsar Cluster
Pulsarの一つのClusterはメッセージの送受信を仲介するBroker, メッセージを保存するBookKeeper, メタ情報や設定を保持するZooKeeperから構成されています。
Broker
Brokerはその名の通りメッセージ送受信を仲介するコンポーネントで、以下の3つの機能を提供します:
Topicにおけるメッセージの配信
各TopicはCluster内のBrokerのどれか一つに割り当てられます。また一つのBrokerには複数(場合によっては数千単位)のTopicを割り当てることができます。Brokerは担当するTopicにメッセージの投稿があった際、それをストレージ(BookKeeper)に保存しつつ、購読しているConsumerに送信します。
他ClusterへのTopicのレプリケーション
Pulsarでは複数のCluster間でTopicをレプリケートできます。レプリケーションの設定が有効になっているTopicに対して投稿があると、Brokerは非同期にレプリケート先Clusterへメッセージを送信します。利用方法はGeoReplicationを参照してください。
設定管理API
Property, Cluster, Namespaceの作成や権限設定などを行うREST APIを提供します。
BookKeeper
Pulsarではメッセージを保存するストレージとしてApache BookKeeperを利用しています。ソフトウエアの名称としてはBookKeeperですが、実際にメッセージを保存しているコンポーネントのことをBookieと呼びます。BookieにはメッセージそのものとSubscriptionごとのCursorの情報が保存されます。メッセージはすべてのConsumerからACKが返されるまで削除されないため、もしも障害発生などの理由によりConsumerがメッセージを受け取れなかった場合は、再送することでリカバリができます。
BookKeeperは以下の特徴を持ちます:
- 一貫性を保証
- スケーラブル
- 大量のメッセージの同時読み書きが可能
これらはPulsarにおける配信の保証と高スループット/低レイテンシの実現に重要な役割を果たしています。
ZooKeeper
Pulsarでは設定情報の保持にApache ZooKeeperを利用しています。Pulsarにおいては下記の2種類のZooKeeperプロセスが存在します:
- Global ZooKeeper:全てのClusterで共通の設定情報(Property, Namespaceなど)を保持
- Local ZooKeeper:個々のClusterごと特有の設定情報(Broker, BookKeeperのメタ情報など)を保持
典型的にはLocal ZooKeeperを構成するサーバーの一部にGlobal ZooKeeperを兼任させます。
特徴
以下にPulsarの主な特徴をまとめます:
- マルチテナント
- 配信保証
- 高スループット/低レイテンシ
- スケーラブル
- Cluster間レプリケーション
Hello World
それではGettingStartedを参考に、実際にPulsarを動かしてみましょう。
パッケージの入手
https://github.com/yahoo/pulsar/releases から最新版のpulsar-VERSION-bin.tar.gzをダウンロードし、展開します。執筆時点でのバージョンは1.14でした。
$ tar xvfz pulsar-1.14-bin.tar.gz
$ cd pulsar-1.14
Standalone serverの起動
PulsarにはStandalone(Broker, BookKeeper, Zookeeperが一つのサーバー上ですべて起動する)モードが用意されています。実際に運用する際はそれぞれ別のサーバーを立てることになると思いますが、今回はお試しですのでこれを利用することにします。展開したディレクトリのbin
配下に起動スクリプトが用意されています。これに引数としてstandalone
を渡すことで、Standalone serverが起動します:
$ bin/pulsar standalone
pulsar-clientでの動作確認
pulsar-client
というコマンドが用意されているので、これを使ってProduce/Consumeの動作を確認してみましょう。Standalone serverを起動するとsample/standalone/ns1
というNamespaceが作成されているので、このNamespace上のmy-topic
というTopicに対してProduce/Consumeを行います。
Consumerを起動します:
$ bin/pulsar-client consume -s 'sub' 'persistent://sample/standalone/ns1/my-topic'
別のターミナルなどでProducerを起動し、メッセージを送信します:
$ bin/pulsar-client produce -m 'hello' 'persistent://sample/standalone/ns1/my-topic'
Consumer側にメッセージが表示されます:
----- got message -----
hello
サンプルコードを動かしてみる
次は実際にJavaのサンプルコードを動かしてみましょう。
HelloProducer.java:
package pulsar.hello;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.Producer;
class HelloProducer {
public static void main(String[] args) throws Exception {
try (PulsarClient client = PulsarClient.create("http://localhost:8080")) {
Producer producer = client.createProducer("persistent://sample/standalone/ns1/my-topic");
for (int i = 0; i < 10; i++) {
// メッセージを送信する
producer.send(String.format("my-message-%d", i).getBytes());
}
}
}
}
HelloConsumer.java:
package pulsar.hello;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.Message;
class HelloConsumer {
public static void main(String[] args) throws Exception {
try (PulsarClient client = PulsarClient.create("http://localhost:8080")) {
Consumer consumer = client.subscribe("persistent://sample/standalone/ns1/my-topic", "my-subscribtion-name");
for (int i = 0; i < 10; i++) {
// メッセージを受信する
Message msg = consumer.receive();
System.out.println("Received message: " + new String(msg.getData()));
// BrokerにACKを返す
consumer.acknowledge(msg);
}
}
}
}
Mavenを利用していることが前提ですが、ビルドする際には以下をpom.xmlのdependencyに追加してください(versionは適宜変更してください)。
<dependency>
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>1.14</version>
</dependency>
ビルドしてConsumer → Producerの順に起動するとConsumerがメッセージを受信する様子が確認できます。
Received message: my-message-0
Received message: my-message-1
Received message: my-message-2
Received message: my-message-3
Received message: my-message-4
Received message: my-message-5
Received message: my-message-6
Received message: my-message-7
Received message: my-message-8
Received message: my-message-9
おわりに
本稿ではPulsarの紹介を行いました。
Pulsarは配信信頼性/パフォーマンス/スケーラビリティに優れるPub-Subメッセージングシステムで、Yahoo! Inc.の主要サービスで利用されている実績があります。またYahoo! JAPANにおいてもメールやコンテンツ入稿システムなどのサービスへの導入が検討されています。
今後の取り組みとしてはメッセージ生成にかかるレイテンシのさらなる改善(99.9パーセンタイルが5[ms]以下になるようにする)やJava以外の言語のサポートなどが挙がっています。加えてYahoo! JAPANではKerberosやSASLフレームワークにのっとった方式など、さまざまな認証認可プラグインの追加を検討しています。
※ Pulsarは任意の認証認可の仕組みをプラグインとして組み込めるように設計されていますが、現時点でのプラグインの実装はTLSクライアント認証のみとなっています。具体的な利用方法についてはAuthentication, Authorizationを参照してください。
もともとはYahoo! Inc.内に閉じていた技術であったPulsarですが、社外のさまざまな開発者に使ってもらい多くのフィードバックを得る目的で、この度OSS化されることになりました。筆者自身、社外の先進的な技術・知識に触れ、Pulsarをさらに進化させていくことに期待とやりがいを感じています。まだまだ公開されたばかりのOSSではありますが、どんどん機能を追加しつつコミュニティーを盛り上げて行きますので、ぜひPulsarを試してみてください!
緊急告知
2016年11月4日(金)にYahoo! JAPAN MeetUp #3【Pulsar公開記念特別編】を開催します!
Yahoo! Inc./Yahoo! JAPANのエンジニアが登壇し、Pulsarとその利用事例などについて講演を行います。また講演後には懇親会(軽食/お酒あり!)が予定されていますので、議論や交流を深める場としてご活用いただければと思います。Pulsarのことが気になり始めた方々、ぜひご参加ください!
お申込みは次のURLからお願い致します:
http://yj-meetup.connpass.com/event/41931/
参考
こちらの記事のご感想を聞かせください。
- 学びがある
- わかりやすい
- 新しい視点
ご感想ありがとうございました