こんにちは。ヤフー株式会社 システム統括本部 の津田です。私は現在、キューイング, Pub-Sub, ストリーミングなどを実現するためのメッセージングプラットフォームを社内向けに提供するチームに所属しています。
前回はOSS Apache Pulsar(以降、Pulsarと記載します)の全体概要やサンプルコードの動かし方を紹介しました。
今回はPulsarのクライアントの使い方をJavaクライアントベースで紹介します。紹介の流れは以下の通りです。
- 基本的な使い方
- Producer
- Consumer
- その他機能の使い方
- 非同期メソッド
- メッセージの圧縮
- メッセージの暗号化
- メッセージの再送
- partitionedトピック
なお、本記事はPulsarの用語を用いていますが、一部のPulsarの用語に関しての説明はありません。このため、事前に前回の記事を読んでいただけると幸いです。
ヤフー社内におけるPulsar活用状況
私の所属するチームはPulsarがOSS化される2016年9月頃から携わっており、Pulsarのコミッターも複数名在籍しています。社内では、Pulsarを安定的にユーザーに利用していただけるように運用することに加えて社内における需要・事例をもとに機能拡張・バグ修正等の開発を通じたOSSへの貢献もしています。
ここで運用するPulsarを社内のプロダクトが利用することにより、プロダクトを開発するエンジニアはメッセージングプラットフォームを個別に運用する必要がなくなります。その結果、プロダクトは本来やるべきサービスやそれに伴うユーザー価値の創造に専念できるようになります。
ヤフーとPulsarの関わりについては、別記事を投稿しているのでこちらもご覧ください。
基本的な使い方
まずはProducerとConsumerの基本的な使い方を見ていきましょう。
Producer
Producerのサンプルクラスは以下の通りですが、コードベースで解説していきます。
package pulsar.hello;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
class HelloProducer {
public static void main(String[] args) throws Exception {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://public/default/my-topic")
.create();
for (int i = 0; i < 10; i++) {
// メッセージを送信する
final String message = "my-message-" + i;
producer.send(message.getBytes());
System.out.println("Send message: " + message);
}
producer.close();
pulsarClient.close();
}
}
PulsarClientの作成
buildメソッドでPulsarClientを作成します。
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
この際、serviceUrlメソッドで接続先のBrokerサーバーを指定します。
Producerの作成
createメソッドでProducerを作成します。
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://public/default/my-topic")
.create();
この際、topicメソッドでメッセージの送信先となるトピックを指定します。指定するトピックですが、以下のいずれかの永続化ポリシーを指定する必要があります。
永続化ポリシー | 概要 |
---|---|
persistent | メッセージが永続化されることが保証されています。 |
non-persistent | メッセージは永続化されません。 |
なお、Brokerサーバーがデフォルト設定の場合は、メッセージ送受信時に自動的にトピックが作成されます。設定を変更する場合はこちらを変更し、Brokerサーバーを再起動すれば良いです。
メッセージの送信
sendメソッドでメッセージを送信します。
for (int i = 0; i < 10; i++) {
// メッセージを送信する
final String message = "my-message-" + i;
producer.send(message.getBytes());
System.out.println("Send message: " + message);
}
sendメソッドはメッセージの送信が完了したら呼び出し元に制御を戻します。
コネクションのClose
closeメソッドでBrokerサーバーとのコネクションを切断します。
producer.close();
pulsarClient.close();
Consumer
Consumerのサンプルクラスは以下の通りですが、コードベースで解説していきます。
package pulsar.hello;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
class HelloConsumer {
public static void main(String[] args) throws Exception {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription-name")
.subscribe();
for (int i = 0; i < 10; i++) {
// メッセージを受信する
final Message message = consumer.receive();
System.out.println("Received message: " + new String(message.getData()));
// BrokerにACKを返す
consumer.acknowledge(message);
}
consumer.close();
pulsarClient.close();
}
}
PulsarClientの作成
Producerのコードと同様ですが、buildメソッドでPulsarClientを作成します。
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumerの作成
subscribeメソッドでConsumerを作成します。
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription-name")
.subscribe();
Producerのコードと同様ですが、topicメソッドで永続化ポリシーとメッセージの受信先となるトピックを指定します。
また、Consumerに関してはsubscriptionNameメソッドでサブスクリプションを指定します。サブスクリプションはメッセージ受信時に自動的に作成されます。
メッセージの受信
receiveメソッドでメッセージを受信します。
for (int i = 0; i < 10; i++) {
// メッセージを受信する
final Message message = consumer.receive();
System.out.println("Received message: " + new String(message.getData()));
// BrokerにACKを返す
consumer.acknowledge(message);
}
receiveメソッドはメッセージの受信が完了したら呼び出し元に制御を戻します。
acknowledgeメソッドでBrokerサーバーにメッセージのAckを行う必要があります。AckはConsumerでメッセージを受信して処理したよ。というメッセージになると考えてもらえれば良いです。
なお、Brokerサーバーで各メッセージを保持していますが、永続化ポリシーをpersistentとしていた場合、全てサブスクリプションからAckを行うまでは対象メッセージを保持し続けます。Ackを行わないとBrokerサーバーでメッセージがたまり続けますので注意が必要です。
コネクションのClose
closeメソッドでBrokerサーバーとのコネクションを切断します。
consumer.close();
pulsarClient.close();
サブスクリプションタイプの指定
ConsumerのサブスクリプションタイプはsubscriptionTypeメソッドで指定可能です。
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription-name")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
指定可能なサブスクリプションタイプに関しては前回の記事をお読みください。
その他機能の使い方
Pulsarでは、「基本的な使い方」で紹介した機能以外にもさまざまな機能があります。いくつかピックアップして機能ごとに使い方を見ていきましょう。
非同期メソッド
「基本的な使い方」で紹介したサンプルクラスにおいてメッセージ送受信のコードがありましたが、こちらは同期メソッドを使っています。メッセージの送受信ですが、非同期メソッドも用意されています。
前述しましたが、同期メソッドの場合はメッセージの送受信が完了したら呼び出し元に制御を戻します。対して、非同期メソッドの場合はメッセージの送受信が完了する前に呼び出し元に制御を戻します。(この際、CompletableFutureのインスタンスを返します)
それではProducerとConsumerの使い方を見ていきましょう。
Producer
「基本的な使い方」のサンプルクラスではsendメソッドを使ってメッセージを送信していましたが、こちらをsendAsyncメソッドに変更します。
List<CompletableFuture<Void>> result = new ArrayList<>();
for (int i = 0; i < 10; i++) {
// メッセージを送信する
final String message = "my-message-" + i;
result.add(
producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
System.out.println("Send message: " + message);
})
);
}
// 非同期メソッド処理の終了を待ち受ける
CompletableFuture.allOf(result.toArray(new CompletableFuture[result.size()])).join();
Consumer
「基本的な使い方」のサンプルクラスではreceiveメソッドを使ってメッセージを受信していましたが、こちらをreceiveAsyncメソッドに変更します。
List<CompletableFuture<Void>> result = new ArrayList<>();
for (int i = 0; i < 10; i++) {
// メッセージを受信する
result.add(
consumer.receiveAsync().thenAccept(message -> {
System.out.println("Received message: " + new String(message.getData()));
// BrokerにACKを返す
consumer.acknowledgeAsync(message);
})
);
}
// 非同期メソッド処理の終了を待ち受ける
CompletableFuture.allOf(result.toArray(new CompletableFuture[result.size()])).join();
また、Consumerに関してはリスナーが使えます。messageListenerメソッドでリスナーがメッセージを受信した際の挙動を記述します。
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription-name")
.messageListener(new MessageListener<byte[]>() {
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
System.out.println("Received message: " + new String(message.getData()));
consumer.acknowledgeAsync(message);
}
})
.subscribe();
メッセージの圧縮
ProducerとConsumer間のメッセージは以下の形式で圧縮できます。
- LZ4
- ZLib
- ZSTD
- SNAPPY
メッセージを圧縮するとメッセージサイズが小さくなりますので、Brokerサーバーの負荷、通信コストの低減、ストレージの節約につながります。特にメッセージサイズが大きい場合は効果が高くなります。
ただし、ProducerとConsumerに関しては単純に圧縮と展開処理が追加となるため、処理コストが高くなります。
メッセージの圧縮を行う場合はProducerのコードを実装する必要があります。compressionTypeメソッドで圧縮形式を指定するとメッセージの圧縮が行われます。
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://public/default/my-topic")
.compressionType(CompressionType.LZ4)
.create();
圧縮されたメッセージのメタデータには圧縮形式が記録されています。Consumerはメタデータの圧縮形式を参照し、自動的に展開処理を行うようになっているためコードの変更は不要です。
メッセージの暗号化
ProducerとConsumer間のメッセージは暗号化できます。
Pulsarでは、公開鍵暗号(非対称暗号)と共通鍵暗号(対称暗号)を組み合わせたハイブリッド暗号が使用されます。
Producerは公開鍵、Consumerは秘密鍵を使います。このため、事前にECDSAまたはRSAの公開鍵と秘密鍵のペアを作成し、ProducerとConsumer側に配置する必要があります。
# ECDSA
$ openssl ecparam -name secp521r1 -genkey -param_enc explicit -out private.key
$ openssl ec -in private.key -pubout -out public.key
# RSA
$ openssl genrsa -out private.key 2048
$ openssl rsa -in private.key -pubout -out public.key
まずはインターフェースのCryptoKeyReaderを実装したクラスを用意します。このクラスはProducerとConsumerのサンプルクラスが公開鍵、秘密鍵をロードするために使用します。
package pulsar.hello;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
public class RawFileKeyReader implements CryptoKeyReader {
private String publicKeyFile = "";
private String privateKeyFile = "";
public RawFileKeyReader(String publicKeyFile, String privateKeyFile) {
this.publicKeyFile = publicKeyFile;
this.privateKeyFile = privateKeyFile;
}
@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
} catch (IOException e) {
System.out.println("ERROR: Failed to read public key from file " + publicKeyFile);
e.printStackTrace();
}
return keyInfo;
}
@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
} catch (IOException e) {
System.out.println("ERROR: Failed to read private key from file " + privateKeyFile);
e.printStackTrace();
}
return keyInfo;
}
}
それではProducerとConsumerの使い方を見ていきましょう。
Producer
cryptoKeyReader, addEncryptionKeyメソッドで公開鍵の情報を指定するとメッセージの暗号化が行われます。
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://public/default/my-topic")
.cryptoKeyReader(new RawFileKeyReader("/path/to/public.key", null))
.addEncryptionKey("my-app-key")
.create();
Consumer
cryptoKeyReaderメソッドで暗号鍵の情報を指定するとメッセージの復号が行われます。
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription-name")
.cryptoKeyReader(new RawFileKeyReader(null, "/path/to/private.key"))
.subscribe();
メッセージの再送
Consumerですが、Ackを行っていないメッセージを再送できます。
メッセージの再送は以下の手段で行うことができますが、それぞれ見ていきましょう。
- NegativeAck
- Ackのタイムアウト
NegativeAck
NegativeAckはメッセージの処理失敗時などでメッセージの再送を行いたい場合に使います。「基本的な使い方」でacknowledgeメソッドを使ってAckを行ってましたが、代わりにnegativeAcknowledgeメソッドを使ってNegativeAckを行います。
// BrokerにNegativeAckを返す
consumer.negativeAcknowledge(message);
なお、メッセージの再送ですが、NegativeAckを行った後に一定時間が経過してから行われます。デフォルト設定の場合は1分後に再送されますが、こちらはnegativeAckRedeliveryDelayメソッドで指定可能です。
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription-name")
.negativeAckRedeliveryDelay(10, TimeUnit.SECONDS)
.subscribe();
Ackのタイムアウト
Ackのタイムアウトですが、Consumerがメッセージを受信してから対象メッセージのAckを行うまでの間で、設定したタイムアウト値の時間を超過した場合に発生します。この際、対象メッセージがConsumerに再送されます。ackTimeoutメソッドでAckのタイムアウト値を指定できます。
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription-name")
.ackTimeout(10, TimeUnit.SECONDS)
.subscribe();
partitionedトピック
partitionedトピックは1つのトピックに複数の内部トピックがある特殊なトピックです。
前回の記事でも触れましたが、通常、1つのトピックは1つのBrokerサーバーが担当しています。各Brokerサーバーはトピックごとにスループットを制限しているため、通常のトピックを使用している場合は制限値以上のスループットは出ません。
partitionedトピックは複数の内部トピックで構成されており、内部トピックごとにBrokerサーバーの担当が割り振られます。担当となったBrokerサーバーの数にも依存しますが、トピックのスループットの向上が期待できます。
partitionedトピックは以下の様に作成できます。永続化ポリシーは通常のトピックと同様にpersistentかnon-persistentのいずれかを指定できます。
$ pulsar-admin topics create-partitioned-topic --partitions 4 persistent://public/default/my-partitioned-topic
上記を作成した場合は以下の内部トピックが作成されます。(内部トピックには"-partition-<連番>"が付与されます)
persistent://public/default/my-partitioned-topic-partition-0
persistent://public/default/my-partitioned-topic-partition-1
persistent://public/default/my-partitioned-topic-partition-2
persistent://public/default/my-partitioned-topic-partition-3
なお、ProducerとConsumerは内部トピックではなく以下のpartitionedトピックを指定します。
persistent://public/default/my-partitioned-topic
それではpartitionedトピックの使い方を見ていきましょう。
ルーティングモード
基本的な使い方は通常のトピックと同様ですが、partitionedトピックにはルーティングモードという概念があります。ルーティングモードとは、Producerがpartitionedトピックに対して送信したメッセージがどの内部トピックにルーティングされるかを決定する仕組みです。
基本的なルーティングモードは以下の3種類が存在します。
- キーハッシュ
- シングルパーティション
- ラウンドロビンパーティション
ルーティングモードですが、Producerのコードで指定できます。ルーティングモードをProducerで指定しない場合、Javaクライアントのバージョンが1系の場合はシングルパーティション、2系の場合はラウンドロビンパーティションがデフォルトで使われます。
それではルーティングモードごとに使い方を見ていきましょう。
キーハッシュ
キーハッシュとは、キーの指定されたメッセージに対して適用されるモードです。同じキーを持つメッセージは同じ内部トピックにルーティングされるため、キーが同じメッセージの順番は保証されます。
sendメソッドを実行する前に、keyメソッドでkey値を指定するとキーハッシュを利用できます。
for (int i = 0; i < 10; i++) {
// メッセージを送信する
final String message = "my-message-" + i;
producer.newMessage()
.value(message.getBytes())
.key("key" + (i % 2))
.send();
System.out.println("Send message: " + message);
}
シングルパーティション
シングルパーティションとは、キーの指定されていないメッセージに対して適用されるモードです。メッセージはランダムに選択された1つの内部トピックにのみルーティングされるため、あるProducerが送信したメッセージは順番が保証された状態でConsumerに届きます。ただし、Producerが1つだけの場合は通常のトピックと同じスループットしか実現できません。
messageRoutingModeメソッドでルーティングモードを指定するとシングルパーティションを利用できます。
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://public/default/my-partitioned-topic")
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
ラウンドロビンパーティション
ラウンドロビンパーティションとは、キーの指定されていないメッセージに対して適用されるモードです。メッセージは異なる内部トピックにラウンドロビンでルーティングされます。高いスループットが実現できる一方でメッセージの順番は保証されません。
messageRoutingModeメソッドでルーティングモードを指定するとラウンドロビンパーティションを利用できます。
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://public/default/my-partitioned-topic")
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();
おわりに
今回はJavaクライアントベースでクライアントの使い方を紹介させていただきました。
Pulsarのクライアントですが、Java以外にもC++, Node.js, Python, Goをサポートしています。いずれのクライアントも基本的にはJavaクライアントと同一の機能がありますので、興味があれば試していただければと思います。
不定期での投稿になるかと思いますが、次回もお楽しみに!
こちらの記事のご感想を聞かせください。
- 学びがある
- わかりやすい
- 新しい視点
ご感想ありがとうございました
- 津田 秀介
- メッセージングプラットフォーム エンジニア
- Apache Pulsarをベースにしたメッセージングプラットフォームを開発・運用しています。