テクノロジー

2020.09.24

メッセージングPF「Apache Pulsar」の使い方(サーバー編)

こんにちは。ヤフー株式会社システム統括本部の坂本です。現在、私はオープンソースのメッセージングミドルウェアであるApache Pulsarを社内向けの共通プラットフォームとして提供・運用するチームに所属しています。

私たちのチームでは、Pulsarを紹介する記事を過去4回にわたって連載してきました。過去の記事については以下をご覧ください。

  1. メッセージングPF「Apache Pulsar」の使い方(入門編)
  2. メッセージングPF「Apache Pulsar」の使い方(クライアント編)
  3. メッセージングPF「Apache Pulsar」の使い方(クライアント編2)
  4. Pulsar Summitのセッションと、ヤフーの発表内容紹介

さて、5回目となる今回は、Pulsarのサーバーサイドに関する記事です。

Pulsarには、複数のコンポーネントを1つのJVMプロセスとして手軽に起動できるstandaloneモードが存在しており、これまでの記事ではこのstandaloneモードを用いてPulsarの基本的な使い方を説明してきました。しかし、standaloneモードはあくまでデモンストレーション用といった位置付けであり、プロダクション環境でPulsarをこのモードで稼働させる事はあまりありません。

可用性やパフォーマンスの向上のために、プロダクション環境ではPulsarを構成する各コンポーネントを複数のホスト上で個別のJVMプロセスとして起動し、Pulsarのクラスターを構築する必要があります。本稿では、その具体的な方法をご紹介していきたいと思います。

なお、本稿ではPulsarのバージョン2.6.1を基に解説を行います。

ヤフーとPulsarの関わり

私の所属するチームはPulsarがOSS化される2016年9月頃から携わっており、Pulsarのコミッターも複数名在籍しています。このチームではヤフーの各プロダクト向けにPulsarを運用しつつ、社内における需要・事例をもとに機能拡張・バグ修正などの開発を通じたOSSへの貢献を行っています。

詳細については別記事を投稿しているのでこちらもご覧ください。

Pulsarを構成するコンポーネント

第1回の記事で解説したように、Pulsarのサーバーサイドには複数のコンポーネントが存在します。Pulsarのデプロイ方法を説明する前に、Pulsarを構成する各コンポーネントについて簡単におさらいしておきます。

components図1. Pulsarを構成するコンポーネント

Broker

ProducerとConsumerの間のメッセージのやりとりを仲介する役割を持つサーバーです。それに加えて、テナントやネームスペースの作成・設定が可能なREST APIも提供しています。

Bookie

オープンソースのストレージシステムであるApache BookKeeperのサーバーです。Brokerは、Producerからトピックに送信されたメッセージをBookieに書き込んで永続化します。永続化されたメッセージは、そのトピックを購読しているConsumerによって受信されるまでBookieに保存されます。

ZooKeeper

BrokerとBookieは、共にメタデータの管理のためにオープンソースのメタデータストアであるApache ZooKeeperを使用しています。Pulsarでは、ZooKeeperを次の2通りの用途で使用します。

  • Local ZooKeeper
    • それぞれのPulsarクラスターで独立した情報(Broker/Bookieのメタデータやトピックの統計情報など)を管理します。
  • Configuration Store(Global ZooKeeper)
    • 複数のPulsarクラスターで共有する必要のある情報(テナントやネームスペースの設定情報など)を管理します。

各コンポーネントのデプロイ方法

それでは、ここからは各コンポーネントのデプロイ方法を解説していきます。それに併せて、それぞれのコンポーネントの主要な設定項目もご紹介しようと思います。

前章で説明したように、Brokerが動作するにはBookieとZooKeeperが必要であり、Bookieが動作するにはZooKeeperが必要です。つまり、コンポーネントをデプロイする順番は、

  1. ZooKeeper
  2. Bookie
  3. Broker

でなければなりません。

なお、今回の解説はそれぞれのコンポーネントのサーバー1つを個別の物理マシンまたは仮想マシンにデプロイする事を想定したものです。

準備

以下の準備を、Pulsarのコンポーネントをインストールする全ての環境(ホスト)で実施してください。

Java 11のインストール

Pulsarを構成する全てのコンポーネントはJavaで実装されており、動作環境にはあらかじめJava 11をインストールしておく必要があります。

JDKのバイナリは任意のものを使っていただいて構いませんが、例としてOracleが提供しているOpenJDKのリファレンス実装は次のページからダウンロードできます。

https://jdk.java.net/

Pulsarのバイナリパッケージのダウンロード

下記のコマンドでPulsar 2.6.1のバイナリパッケージをダウンロードしてください。このパッケージにはPulsarの全てのコンポーネントのバイナリが含まれます。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.6.1/apache-pulsar-2.6.1-bin.tar.gz

パッケージのダウンロードが完了したら、下記のコマンドでパッケージを解凍・展開します。

$ tar -xvzf apache-pulsar-2.6.1-bin.tar.gz

展開されたパッケージには、下記のディレクトリが含まれています(または、コンポーネントが稼働を開始するタイミングで作成されます)。

ディレクトリ 含まれるファイル
bin pulsarpulsar-admin といったコマンドラインツール
conf 各コンポーネントの設定ファイル
data BookKeeperとZooKeeperがデータを保存するディレクトリ
lib Pulsarによって使用されるJARファイル
logs ログファイル

ZooKeeperのデプロイ

必要なサーバー数

ZooKeeperはサーバー単独で稼働させる事も可能ですが(standaloneモード)、特にプロダクション環境では複数のサーバーを稼働させ、それらを協調させてサービスを提供するのが一般的です(replicatedモード)。こうしたZooKeeperのサーバーのグループはアンサンブルと呼ばれ、全てのサーバーが同じデータの複製を持っています。

ZooKeeperをreplicatedモードで稼働させる場合、サーバーの数は奇数とする事が推奨されています。つまり、最低でも3台のサーバーを起動する必要があります。

なぜ2台や4台といった偶数ではいけないのでしょうか。これは、ZooKeeperのアンサンブルはその構成サーバーの過半数が稼働していないとサービスを提供できないためです。ZooKeeperのアンサンブルでは、1台のサーバーがリーダー、それ以外のサーバーがフォロワーとなり、リーダーはクライアントから送られてくる更新リクエストを取りまとめてデータの整合性を保つ役割を果たします。こうしたリーダー権限を行使するには、アンサンブルの過半数のサーバーからの支持を得なければなりません。この過半数のサーバー群はクォーラムと呼ばれます。

重要なのは、必要なサーバーの数は「半分以上」ではなく「過半数」である事です。例えば2台のサーバーでアンサンブルを構成した場合、クォーラムは2台となり、ダウンが許容されるサーバーの数は0台でstandaloneモードと変わりません。それどころか、単一障害点が1つから2つに増えている分、standaloneモードよりも可用性は低下してしまいます。

サーバーの起動方法

ZooKeeperのサーバーを起動するには、アンサンブルを構成する全てのサーバーのホスト名を設定ファイル conf/zookeeper.conf に記述する必要があります。以下は3台のサーバーでアンサンブルを構成する場合の記述例です。

server.1=zk1.pulsar.yahoo.co.jp:2888:3888
server.2=zk2.pulsar.yahoo.co.jp:2888:3888
server.3=zk3.pulsar.yahoo.co.jp:2888:3888

server.N のNの部分は各サーバーに割り当てるIDです。ホスト名の後についている 28883888 はTCPのポート番号です。2888 はリーダーとフォロワーのトランザクションのやりとりに使用され、3888 は新しいリーダーを選出する際に使用されます。

続いて、各ZooKeeperサーバーで data/zookeeper ディレクトリを作成し、その中の myid ファイルに自身に対応するID(server.N のNの部分)を記述してください。以下はIDが1である zk1.pulsar.yahoo.co.jp におけるコマンドの例です。

$ mkdir -p data/zookeeper
$ echo 1 > data/zookeeper/myid

最後に、次のコマンドでZooKeeperサーバーのプロセスをデーモンとして起動します。全てのホストでサーバーを起動できたらZooKeeperのデプロイは完了です。

$ bin/pulsar-daemon start zookeeper

設定項目

server.N 以外にもZooKeeperにはさまざまな設定項目が存在しますが、ほとんどはデフォルトのままで問題ありません。

項目名 説明 デフォルト値
tickTime ZooKeeperにおける基本的な時間の単位をミリ秒で指定 2000
initLimit フォロワーがリーダーに接続して初回の同期を行う際のタイムアウト時間をtickTimeの個数で指定 10
syncLimit フォロワーがリーダーに同期する際のタイムアウト時間をtickTimeの個数で指定 5
dataDir インメモリのデータベースのスナップショットや更新のトランザクションログが保存されるディレクトリ data/zookeeper
clientPort ZooKeeperのサーバーがクライアントからの接続をリッスンするポート 2181
autopurge.snapRetainCount dataDirに保持するスナップショットとトランザクションログの個数 3
autopurge.purgeInterval 古いスナップショットとトランザクションログを削除する間隔を時間単位で指定 1
maxClientCnxns 同時に接続可能なクライアントの最大ソケット数 60

Configuration Storeについて

さて、既に述べたように、PulsarではZooKeeperをLocal ZooKeeperとConfiguration Storeという2通りの用途で使用します。複数のPulsarクラスターを構築してそれらを統合したい場合(地理的に離れた複数のデータセンターが存在する場合、1つのデータセンターを1つのクラスターとするのが一般的です)、Local ZooKeeperとConfiguration Storeは別々のアンサンブルとする必要があります。つまり、2種類のZooKeeperサーバーのプロセスを起動させなければなりません。

しかし、今回は説明を簡単にするため構築するクラスターは1つだけとし、Local ZooKeeperとConfiguration Storeを同じアンサンブルとします。Pulsarをマルチクラスター構成にしたい場合には、下記の公式ドキュメントを参考にしてください。

https://pulsar.apache.org/docs/en/deploy-bare-metal-multi-cluster/

クラスターメタデータの初期化

ZooKeeperのデプロイが完了したら、PulsarクラスターのメタデータをZooKeeperに書き込んでおく必要があります。この作業はクラスターの新規構築時に一度だけ実行すれば大丈夫です。

メタデータの初期化に使用するコマンドもPulsarのバイナリパッケージに含まれています。次のようなコマンドを任意のZooKeeperサーバー1台で実行してください。

$ bin/pulsar initialize-cluster-metadata \
  --cluster pulsar-cluster-1 \
  --zookeeper zk1.pulsar.yahoo.co.jp:2181 \
  --configuration-store zk1.pulsar.yahoo.co.jp:2181 \
  --web-service-url http://broker.pulsar.yahoo.co.jp:8080 \
  --web-service-url-tls https://broker.pulsar.yahoo.co.jp:8443 \
  --broker-service-url pulsar://broker.pulsar.yahoo.co.jp:6650 \
  --broker-service-url-tls pulsar+ssl://broker.pulsar.yahoo.co.jp:6651

それぞれのコマンドラインオプションの意味は次の通りです。

オプション 説明
--cluster クラスターの名前
--zookeeper 任意のLocal ZooKeeperサーバー1台のホスト名とポート番号
--configuration-store 任意のConfiguration Storeサーバー1台のホスト名とポート番号
--web-service-url クラスターのHTTPサービスのURL
--web-service-url-tls クラスターのHTTPSサービスのURL(HTTPSを使用しない場合は不要)
--broker-service-url クラスターのBrokerサービスのURL。URLのスキームは http ではなく pulsar を指定
--broker-service-url-tls クラスターのTLS BrokerサービスのURL。URLのスキームは https ではなく pulsar+ssl を指定(TLSを使用しない場合は不要)

なお、上記のコマンド例はクラスター内に存在する複数のBrokerサーバーに到達できる共通のドメイン(ここでは broker.pulsar.yahoo.co.jp がそれに当たります)を用意できる事を前提としたものです。そうしたドメインの用意が困難な場合には、次のような指定方法で複数のBrokerサーバーのホスト名を列挙する事も可能です。

--web-service-url http://host1:8080,host2:8080,host3:8080 \
--web-service-url-tls https://host1:8443,host2:8443,host3:8443 \
--broker-service-url pulsar://host1:6650,host2:6650,host3:6650 \
--broker-service-url-tls pulsar+ssl://host1:6651,host2:6651,host3:6651

Bookieのデプロイ

必要なサーバー数

ZooKeeperと同じく、BookKeeperも複数のBookieサーバーを構築して冗長化を行います。Bookieサーバーの台数を決定するためには、BookKeeperにおけるデータの保存の仕組みを理解しておく必要があります。

BookKeeperは、ログのようなシーケンシャルなデータをストレージに永続化する機能を提供するミドルウェアです。そうしたデータ1つ1つは、BookKeeperではEntryと呼ばれます。また、連続したEntryのまとまりはLedgerと呼ばれます。BookKeeperはこのLedgerを複数保存できます。言い換えれば、複数の独立したデータストリームを保存可能です。

Pulsarにおいては、トピックのメッセージがBookKeeperのEntryに当たります。あるトピックはオープン状態のLedger 1つとひもづいており、Producerからトピックに送信されたメッセージはそのLedgerにEntryとして追加されていきます。そしてLedgerのEntry数がある程度の数に達したら、そのLedgerはクローズされて新しいLedgerが作成されます。

topic_ledger_entry図2. トピック・Ledger・Entryの関係

さて、各Ledgerはメタデータとして次の3つのパラメーターを持っています。

パラメーター 記号 説明
アンサンブルサイズ E Ledgerを保存するのに使用されるBookieの数
Writeクォーラムサイズ Qw 各Entryが書き込まれるBookieの数
Ackクォーラムサイズ Qa 各Entryの書き込みが完了した事が保証されるBookieの数

3つの数値の大小関係はE >= Qw >= Qaとなっています。以降は、これら3つの数値の意味を具体例を挙げつつ見ていこうと思います。

例として、E = 4、Qw = 3、Qa = 2のケースを考えます。新しいLedgerが作成されると、クラスターに存在するBookieの中から4つが選択されます。ここでは、B1・B2・B3・B4という4つのBookieが選択されたとします。しかし、全てのEntryが4つのBookie全てに書き込まれるわけではありません。Qw = 3の場合、各Entryは4つのBookieの内の3つに書き込まれます。具体的には、書き込み先のBookieは次の表のようになります。

Entryの番号 書き込み先のBookie
0 B1, B2, B3
1 B2, B3, B4
2 B3, B4, B1
3 B4, B1, B2
4 B1, B2, B3

persistence_policies図3. 各Entryが書き込まれるBookie

ただし、これは3つのBookieにEntryの複製が確実に存在する事を意味してはいません。Qa = 2の場合、2つのBookieへの書き込みが完了した時点でそのEntryの永続化は成功したと見なされるためです。2つのBookieにデータが保存されていれば、その内1つのBookieが障害でダウンしたりデータが消えたりしても、もう一方のBookieから同じデータを読み込む事ができます。

クラスター内のBookieの総数がEより少ないとLedgerの作成ができないため、Bookieの総数はE以上でなければなりません。また、クラスターはQa - 1台までのBookieの障害に耐える事ができます。したがって、Bookieの総数はE + Qa - 1以上にするのがいいでしょう。

デフォルトの設定では、E・Qw・Qaの値は全て2であるため、Bookieは少なくとも3台あればよい事になります。

ハードウェア構成

BookKeeperは、ログ先行書き込み(WAL)という仕組みを採用しています。Producerから送信されたメッセージは、その「書き込み操作」の内容が最初にジャーナルと呼ばれるディスク領域に書き込まれます。ジャーナルへのデータの永続化が完了した時点で、BrokerはProducerにメッセージの送信が成功した事を通知します。その後、ジャーナルに記録された書き込み操作はLedgerストレージと呼ばれる別のディスク領域に非同期に反映されます。Ledgerストレージに永続化されたメッセージは、トピックの全てのConsumerによって受信されるまで保存されます。Ledgerストレージへの書き込み中にサーバーの電源が落ちたりした場合でも、ジャーナルからデータを復旧する事が可能です。

BookKeeperが高いパフォーマンスを発揮するには、ジャーナルとLedgerストレージを別々のデバイスとするのが望ましいとされています。ジャーナルには容量は少なくても速度の速いSSDを使用し、Ledgerストレージには速度は遅くても容量の多いHDDを使用するのが一般的です。ただ、こうしたハードウェア構成でなければBookKeeperはデプロイできない、というわけではありません。

data_flow図4. Pulsarにおけるデータ永続化の流れ

サーバーの起動方法

それでは、Bookieサーバーを起動してみましょう。Bookieの設定ファイルは conf/bookkeeper.conf です。多数の設定項目が存在しますが、最低限、次の設定は変更する必要があります。

# ジャーナルとして使用するディレクトリのパス
journalDirectories=data/bookkeeper/journal

# Ledgerストレージとして使用するディレクトリのパス
ledgerDirectories=data/bookkeeper/ledgers

# Local ZooKeeperのホスト名とポートをコンマ区切りで列挙
# BookieはLedgerのメタデータや自分自身が稼働状態にあるかどうかといった情報をLocal ZooKeeperに保存します
zkServers=zk1.pulsar.yahoo.co.jp:2181,zk2.pulsar.yahoo.co.jp:2181,zk3.pulsar.yahoo.co.jp:2181

設定ファイルの編集が完了したら、次のコマンドでBookieサーバーのプロセスをデーモンとして起動します。

$ bin/pulsar-daemon start bookie

Bookieサーバーが正常に起動できたかどうかは、次のコマンドでサニティテストを実行する事で確認できます。サニティテストでは、ローカルのBookieに対してLedger(アンサンブルサイズは1)の作成、書き込み、読み込み、削除を行います。

$ bin/bookkeeper shell bookiesanity

サニティテストが成功したら、他のBookieサーバーも同様に起動していきましょう。

設定項目

Bookieの設定項目は多岐にわたるため、その全てをご紹介する事はできません。以下にその中でも比較的重要と思われるものを挙げておきます。

設定項目 説明 デフォルト値
bookiePort Bookieサーバーがリッスンするポート 3181
useHostNameAsBookieID Bookieが自分自身のIDとしてホスト名を使用するかどうか。falseの場合はIPアドレスを使用 false
numAddWorkerThreads 書き込みリクエストを処理するワーカースレッドの数。0の場合はNettyのスレッドが直接処理 0
numReadWorkerThreads 読み込みリクエストを処理するワーカースレッドの数。0の場合はNettyのスレッドが直接処理 8
numHighPriorityWorkerThreads 優先度の高い特殊なリクエストを処理するワーカースレッドの数 8
autoRecoveryDaemonEnabled あるBookieがダウンした際にそのデータを他のBookieに再複製するデーモンを起動するかどうか true
journalMaxSizeMB ジャーナルファイル1つあたりの最大サイズ(メガバイト) 2048
journalMaxBackups 古いジャーナルファイルのバックアップ数 5
journalSyncData ジャーナルへの書き込みをディスクにフラッシュしてからBrokerに確認応答するかどうか true
prometheusStatsHttpPort Prometheusのメトリクスのエクスポーターが使用するポート 8000
readOnlyModeEnabled ディスク使用量がしきい値に達した際にBookieをReadOnlyモードに移行させるかどうか true
forceReadOnlyBookie Bookieを強制的にReadOnlyモードに変更するためのフラグ false
diskUsageThreshold BookieがReadOnlyモードに移行するディスク使用量のしきい値 0.95
httpServerEnabled 管理用のAPIを提供するHTTPサーバーを有効にするかどうか false
httpServerPort HTTPサーバーがリッスンするポート 8000
dbStorage_writeCacheMaxSizeMb 書き込みキャッシュとして使用するダイレクトメモリのサイズ(メガバイト) ダイレクトメモリの1/4
dbStorage_readAheadCacheMaxSizeMb 読み込みキャッシュとして使用するダイレクトメモリのサイズ(メガバイト) ダイレクトメモリの1/4

Brokerのデプロイ

必要なサーバー数

ZooKeeperとBookieのデプロイが完了したら、いよいよPulsarの核となるコンポーネントであるBrokerをデプロイしていきます。BrokerにはZooKeeperやBookieと違って「少なくとも〇〇台以上起動しなければならない」といった制限は存在しません。単一障害点を作らないように、2台以上起動するのがいいでしょう。

サーバーの起動方法

Brokerの設定ファイルは conf/broker.conf です。以下の設定項目に適切な値を入れてください。

# Local ZooKeeperのホスト名とポートをコンマ区切りで列挙
zookeeperServers=zk1.pulsar.yahoo.co.jp:2181,zk2.pulsar.yahoo.co.jp:2181,zk3.pulsar.yahoo.co.jp:2181

# Configuration Storeのホスト名とポートをコンマ区切りで列挙(今回はLocal ZooKeeperと同じ)
configurationStoreServers=zk1.pulsar.yahoo.co.jp:2181,zk2.pulsar.yahoo.co.jp:2181,zk3.pulsar.yahoo.co.jp:2181

# 「クラスターメタデータの初期化」で登録したクラスター名を指定
clusterName=pulsar-cluster-1

# Ledgerのアンサンブルサイズ
managedLedgerDefaultEnsembleSize=2

# LedgerのWriteクォーラムサイズ
managedLedgerDefaultWriteQuorum=2

# LedgerのAckクォーラムサイズ
managedLedgerDefaultAckQuorum=2

設定ファイルの編集が完了したら、次のコマンドでBrokerサーバーのプロセスをデーモンとして起動します。

$ bin/pulsar-daemon start broker

全てのホストでBrokerサーバーが起動できたら、Pulsarクラスターの構築は完了です。

設定項目

Brokerにもまた、多数の設定項目が存在しています。ここではその一部をご紹介します。

設定項目 説明 デフォルト値
brokerServicePort Pulsarプロトコルのポート 6650
brokerServicePortTls TLS用のPulsarプロトコルのポート。空でない場合はTLSが有効になります
webServicePort HTTPサーバーのポート 8080
webServicePortTls HTTPSサーバーのポート。空でない場合はTLSが有効になります
numIOThreads NettyのI/Oスレッドの数 CPUのコア数の2倍
numHttpServerThreads HTTPリクエストを処理するスレッドの数 CPUのコア数の2倍
backlogQuotaCheckEnabled トピックごとのストレージ使用量のチェックを有効にするかどうか true
backlogQuotaDefaultLimitGB 1トピックあたりのストレージの割り当て量(ギガバイト)。0未満の場合は無制限 -1
backlogQuotaDefaultRetentionPolicy あるトピックのストレージの使用量が割り当て量を超過した場合の挙動(※1) producer_request_hold
ttlDurationDefaultInSeconds Producerから送信されたメッセージの生存時間。0の場合は無限 0
subscriptionExpirationTimeMinutes サブスクリプションの生存時間。0の場合は無限 0
dispatchThrottlingRatePerTopicInMsg 1トピックあたりのConsumerへのメッセージの配信速度(メッセージ/秒)。0の場合は無制限 0
dispatchThrottlingRatePerTopicInByte 1トピックあたりのConsumerへのメッセージの配信速度(バイト/秒)。0の場合は無制限 0
dispatchThrottlingRatePerSubscriptionInMsg 1サブスクリプションあたりのConsumerへのメッセージの配信速度(メッセージ/秒)。0の場合は無制限 0
dispatchThrottlingRatePerSubscriptionInByte 1サブスクリプションあたりのConsumerへのメッセージの配信速度(バイト/秒)。0の場合は無制限 0
maxProducersPerTopic 1トピックに接続可能なProducer数の上限。0の場合は無制限 0
maxConsumersPerTopic 1トピックに接続可能なConsumer数の上限。0の場合は無制限 0
maxConsumersPerSubscription 1サブスクリプションに接続可能なConsumer数の上限。0の場合は無制限 0
maxMessageSize メッセージサイズの上限(バイト) 5242880
maxNumPartitionsPerPartitionedTopic パーティションドトピックのパーティション数の上限。0の場合は無制限 0
tlsCertificateFilePath TLS証明書のファイルパス
tlsKeyFilePath TLS秘密鍵のファイルパス
tlsTrustCertsFilePath Brokerが信頼するTLS証明書のファイルパス
tlsAllowInsecureConnection 安全でないTLSクライアント証明書の使用を許可するかどうか false
tlsRequireTrustedClientCertOnConnect クライアント接続時にTLSクライアント証明書を求めるかどうか false
authenticationEnabled 認証を有効にするかどうか false
authenticationProviders 認証方式を決定するプラグインのクラス名を指定(※2)
authorizationEnabled 認可を有効にするかどうか false
superUserRoles 全ての権限を与える「スーパーユーザー」のロール名を指定
managedLedgerCacheSizeMB Brokerが保持するLedgerのEntryのキャッシュサイズ(メガバイト) ダイレクトメモリの1/5
defaultRetentionTimeInMinutes 全てのConsumerに受信されたメッセージを何分間まで保存しておくか 0
defaultRetentionSizeInMB 全てのConsumerに受信されたメッセージを何メガバイトまで保存しておくか 0
webSocketServiceEnabled WebSocket APIを有効にするかどうか false
functionsWorkerEnabled Pulsar Functionsのワーカーを起動するかどうか false

※1:次の3つのポリシーのいずれかを選択できます。

ポリシー ストレージの割り当て量を超過した場合の挙動
producer_request_hold Producerから送信されたメッセージの永続化を保留します。
producer_exception Producer側で例外がスローされます。
consumer_backlog_eviction バックログのメッセージが古いものから削除されます。

※2:Pulsar 2.6.1の時点で実装されている認証プラグインは以下の通りです。

クラス名 認証方式
org.apache.pulsar.broker.authentication.AuthenticationProviderBasic Basic認証
org.apache.pulsar.broker.authentication.AuthenticationProviderTls TLSクライアント認証
org.apache.pulsar.broker.authentication.AuthenticationProviderToken JSON Web Token認証
org.apache.pulsar.broker.authentication.AuthenticationProviderSasl SASL(Kerberos)認証
org.apache.pulsar.broker.authentication.AuthenticationProviderAthenz Athenz認証

動作確認

クラスターの構築が完了したら、実際にPulsarクライアントを使用してメッセージの送受信を試してみましょう。動作確認には、バイナリパッケージに含まれているCLIツール pulsar-admin および pulsar-client を使用します。

最初にクライアントの設定ファイル conf/client.conf を編集する必要があります。次のようにクラスターのURLを指定してください。

# クラスターのHTTPサービスのURL
webServiceUrl=http://broker.pulsar.yahoo.co.jp:8080

# クラスターのBrokerサービスのURL
brokerServiceUrl=pulsar://broker.pulsar.yahoo.co.jp:6650

もし上記のように複数のBrokerサーバーに到達できる共通のドメイン broker.pulsar.yahoo.co.jp が存在しない場合には、次のように複数のBrokerサーバーのホスト名を列挙してください。

webServiceUrl=http://host1:8080,host2:8080,host3:8080
brokerServiceUrl=pulsar://host1:6650,host2:6650,host3:6650

設定ファイルの編集が完了したら、pulsar-admin コマンドを使って自分のトピックが所属するテナントおよびネームスペースを作成します。

# 「my-tenant」というテナントを作成
$ bin/pulsar-admin tenants create my-tenant

# 「my-tenant/my-ns」というネームスペースを作成
$ bin/pulsar-admin namespaces create my-tenant/my-ns

続いて、pulsar-client コマンドを使ってConsumerを起動します。この際、トピックとサブスクリプションは自動的に作成されます。

# トピック「persistent://my-tenant/my-ns/my-topic」にサブスクリプション「my-sub」を作成し、メッセージを5つ受信
$ bin/pulsar-client consume -s my-sub -n 5 persistent://my-tenant/my-ns/my-topic

Consumerを起動したままもう1つターミナルを開き、Producerを起動してメッセージを送信してみます。

# トピック「persistent://my-tenant/my-ns/my-topic」に「my-msg」というメッセージを5つ送信
$ bin/pulsar-client produce -m my-msg -n 5 persistent://my-tenant/my-ns/my-topic

正常に動作していれば、Consumer側で5つのメッセージが受信できるはずです。

----- got message -----
key:[null], properties:[], content:my-msg
----- got message -----
key:[null], properties:[], content:my-msg
----- got message -----
key:[null], properties:[], content:my-msg
----- got message -----
key:[null], properties:[], content:my-msg
----- got message -----
key:[null], properties:[], content:my-msg

おわりに

第5回の内容は以上です。今回はPulsarのサーバーサイドのデプロイ方法と各コンポーネントの設定項目の一部をご紹介いたしました。

前述の通り、今回の解説はデプロイ対象の環境が(オンプレミスの)物理マシンまたは仮想マシンである事を前提としたものでした。しかし、Pulsarの公式ドキュメントではAmazon Web ServicesやKubernetesといった環境へのデプロイ方法も紹介されています。詳細は下記のページをご参照ください。

さて、Pulsarについて紹介する記事の連載は今回でいったん一区切りとなります。日本国内での認知度はまだまだ高いとは言えないPulsarですが、今回の連載で少しでも多くの方に興味を持っていただけたら幸いです。


坂本 雅宏
エンジニア

Yahoo! JAPANでは情報技術を駆使して人々や社会の課題を一緒に解決していける方を募集しています。詳しくは採用情報をご覧ください。

関連記事

このページの先頭へ