2016年12月 9日

アーキテクチャ

高速ファイル/メッセージ転送 K2HFTFUSE の紹介

  • このエントリーをはてなブックマークに追加

こんにちは、technical yahoo の中谷です。
今回は、Yahoo! JAPANからオープンソースとして公開した高速ファイル/メッセージ転送システムの K2HFTFUSE の紹介をします。
K2HFTFUSEは、確実で高速なファイル/メッセージ転送を低コストで実現するために開発されたシステムです。

K2HFTFUSE(K2Hash File Transaction by FUSE-based file system)とは、FUSE(Filesystem in Userspace)によるユーザースペースでのマウント機能を利用したファイル/メッセージ転送システムです。
K2HFTFUSEは、仮想ファイルシステムを提供し、マウントしたディレクトリにファイルを書き込むだけで利用できます。
既存プログラムの出力ファイルのディレクトリをK2HFTFUSEでマウントするだけで、既存プログラムの変更なしにファイル/メッセージ転送ができます。

ファイル転送の一例として、ログファイルを転送する場合、既存のプログラムを変更することなく、ログを高速かつ確実に集約できます。
類似したシステム(プログラム)では、fluentdがあります。
fluentdと比較しても、高速であり、確実に転送を行うことができるシステムです。
K2HFTFUSEは、テキストファイルへ1行分のデータを書き込んだタイミングで、そのデータを転送します。
この他、転送する内容は、テキストだけではなく、バイナリデータ、メッセージ(1データを1メッセージとする)の転送ができます。
また、転送するデータに対して、任意のフィルタリング、任意の処理を行うこともできます。

背景

ウェブサービスに限らず、ログファイルやデータなどを集約することは昨今当たり前のように必要となっています。
大規模なログなどの集約を行うケースなどでは、Kafka(+Storm)で構築されたシステムが用いられていることと思われますが、小規模であれば簡単にfluentdで構築されたシステムもあるかと思います。

K2HFTFUSE は、同時公開された基礎ライブラリ/システム(K2HASH, CHMPX)を利用して、fluentdのように簡単に、そしてKafka + Stormのように高速で確実な転送ができないかという試みで開発されました。
libfuse(FUSE)をベースに基礎ライブラリ/システムを使った結果、期待以上の高パフォーマンスで可用性、スケーラビリティのあるシステムとなっています。

K2HFTFUSEの構成

まず、以下にK2HFTFUSEを利用したデータ(ファイル/メッセージ)転送の概要図を示します。

Overview

上記に示すように、K2HFTFUSEシステムは、libfuse(FUSE)を利用したFUSE派生プログラムになっています。
K2HFTFUSEプログラムが転送したデータ(ファイル/メッセージ)は、K2HFTFUSESVRプログラムで受信することができ、転送先で集約できます。
転送するデータがテキストを対象とする(ログファイルなど)場合、テキストファイルであれば、1行単位で転送が行われます。

FUSEを利用することにより、転送側のプログラムの変更は必要なく、出力ファイル(ログファイル)などのディレクトリをFUSEによりマウントするだけで、転送ができるようになっています。
これにより、既存システムへの導入に対しての障壁はなく、fluentdと同様に簡単に導入できます。

K2HFTFUSEでは、複数の転送元と複数の転送先を任意に構成できます。
複数の転送元からのデータ(ファイル/メッセージ)を1つの転送元ホストに集約できます。
また、複数の転送先ホストを準備して、分散転送させることも、同報転送もできます。
この通信制御は、今回同時公開されたシステム(CHMPX)により実現される機能となっています。

K2HFTFUSEの名前に含まれている”K2H”はK2HASHを意味しています。K2HASHは独自のKVS(NoSQL)ライブラリであり、このK2HASHを利用したシステムでもあります。
このK2HASHライブラリを利用することで、転送するデータ(ファイル/メッセージ)のキューイングを実現しています。
これにより、通信断、転送先ホストの高負荷などによる転送遅延が発生したとしても、転送データそのものはキューイングされ、ロストする確立を低下してくれています。

K2HFTFUSEの特徴

転送元/転送先の構成

転送元と転送先の構成は、上述したとおりCHMPXというプログラムに任せています。
簡単な説明として、このCHMPXはSocketによるホスト間の常時接続を提供し、ホスト上のプログラム同士の透過的なコミュニケーションを提供する通信ミドルウエアです。
K2HFTFUSEは、このCHMPXを利用することにより、転送元、転送先のホスト接続構成を構築しています。
K2HFTFUSEでは、以下のような接続を提供します。

Flow

転送先ホストを複数とした場合には、転送するデータを、1台のみに、または同報転送として全台に転送することが可能です(この機能はCHMPXの提供する機能です)。
また、1台のみに転送する場合であっても、転送先ホストと通信できない(障害などで)ケースでは、自動的に補助ホストに転送もできます。
これにより即時性を失わず、転送を継続することができ、障害耐性にも強い構成を作ることができます。

多段転送

K2HFTFUSEは、上記のように1段の転送のみではなく、多段の転送もできます。

Multistage

多段に組むことにより、各コロ(データセンターなど)で集約したデータをさらに集約するような用途にも利用できます。

データの保全

テータの転送を行うときに、利用者が気になるデータの保全について説明します。
K2HFTFUSEでは、転送元でデータを書き込んだときに(FUSEでマウントしたディレクトリ以下のファイルへの書き込み)、いったんローカルシステム上のK2HASHにデータがキューイングされます。
キューイングされたデータは、通信状況が正常であれば、CHMPXにより転送先に送り出されます。
この処理におけるタイムラグはほとんどなく通信状況が正常であれば瞬時に行われます。
障害などで通信できない状況の場合には、キューイングされ、通信状況が回復した時点で転送が再開されます。

Queuing

もし、K2HFTFUSEのマウントが外れた場合には、あらかじめマウント時にマウントポイント以下にディレクトリを準備しておくなどすることでローカルファイルとしてK2HFTFUSE導入前と同じようにローカルファイルに出力されます。
K2HFTFUSEを再マウントする前にこのファイルを転送することで再開も可能です(この場合オペレーションは必要です)。

独自処理の組み込み

K2HFTFUSEを利用する上で、提供されているプログラムで基本的なファイル/メッセージの転送ができます。
しかし、用途によっては独自の処理を加えたい場合、ひとつの転送元から多種のデータを転送したい場合など、カスタマイズを加えたいケースがあります。

K2HFTFUSEでは、送信元、中継時、送信先でローカルファイルへの出力を行ったり、外部のプラグインプログラムに処理をさせることができます。

これらは、K2HFTFUSEおよびK2HFTFUSESVRの設定ファイルで詳細に指定できます。
外部のプログラムをプラグインとして起動し、受け取ったデータを引き渡すことができ、これにより独自の処理を行うことができます。

外部のプログラムとK2HFTFUSEおよびK2HFTFUSESVRは、標準入出力をPIPEで接続しているため、パフォーマンスに不安のある場合には、K2HFTFUSESVR自体を自作できます。
ソースコードk2hftfusesvr.ccを参考にして作成できます。
こちらについては、次回以降でサンプルプログラムなど説明できればと思います。

データのフィルターと加工について

データを転送する場合、特定の文字列にマッチングした場合のみ転送したい、特定のデータだけを転送したくないなどのケースが存在します。
これらは、転送元のK2HFTFUSEの設定にて指定することができ、データのフィルタリングを提供しています。

また、複数の転送元からデータの集約を行う場合、どのホストから、何時作成されたデータなのか必要となる場合があります。
K2HFTFUSEでは、デフォルトで、転送元ホスト、データを書き込んだプロセスID、データの書き込まれた時刻等々のデータに付随する情報も一緒に転送されています。
転送先でデータを受信後に、どの付属情報を利用するか、どのように加工するかなどの設定もできます。

パフォーマンスについて

転送元ホストを4台(VM)、転送先ホストを1台(VM)として、それぞれ 2CPU(core) / 4GB memory のスペックでK2HFTFUSEシステムを構築した場合を測定しました。

  • Hypervisor spec: Xeon E5-2650L v3 1.80GHz(24 cores, 48 threads) / 503GB / 300GB RAID-1

転送元が任意のプログラムで特定ファイル(K2HFTFUSEのマウントポイント以下にあるファイル)へ書き込みを行い、転送先は受信したデータを全てローカルファイル(ext4)に出力します。
転送するデータは、テキストファイルであり、1行あたり4KBから10byteまでのデータです。

Data length per line(bytes) (lines / second)
4096 20766
1024 158112
128 292382
10 475638

テストの結果、4KB/行のテキストファイル転送において、20,000行/secの転送速度を記録しました。
なお、今回のテストでは、転送先ホストへの通信が700Mbps程度(4KB * 20K byte/sec)となっており、利用した試験環境のネットワーク上限となっていました。
後日、報告の機会があれば、より詳細な計測結果を公開していきたいと思います。

試行

実際に、ログファイルを例にしてファイル内容を転送してみます。
以下の例では、K2HFTFUSEでApacheのログファイルを転送するサンプルを以下に示します。

前提

foo[0-9].yahoo.co.jp という10台のホスト上で、Apacheを使用してウェブサービスを実行します。
Apacheはログファイルとして各ホストの/home/apache/logsディレクトリにaccess.logとerror.logを出力します。
このサンプルではこれらのログファイルを1台のホスト bar.yahoo.co.jp に集約します。
(access.logとerror.log以外のファイルは/home/apache/logsディレクトリに出力されないものとします。)

設定ファイル

  • Apacheの動作しているホストで利用する設定ファイルとして、slave.yaml(YAML形式)ファイルを準備します。
#
# CHMPX GLOBAL SETTING
#
GLOBAL:
    {
        FILEVERSION:            1,
        DATE:                   "Thu, 01 Dec 2016 00:00:00 +0900",
        GROUP:                  K2HFUSETEST,
        MODE:                   SLAVE,
        DELIVERMODE:            random,
        MAXCHMPX:               32,
        REPLICA:                0,
        MAXMQSERVER:            8,
        MAXMQCLIENT:            8,
        MQPERATTACH:            1,
        MAXQPERSERVERMQ:        2,
        MAXQPERCLIENTMQ:        8,
        MAXMQPERCLIENT:         4,
        PORT:                   18020,
        CTLPORT:                18022,
        SELFCTLPORT:            18022,
        RWTIMEOUT:              100000,
        RETRYCNT:               1000,
        CONTIMEOUT:             500000,
        MQRWTIMEOUT:            50,
        MQRETRYCNT:             100000,
        DOMERGE:                on,
        SSL:                    no,
        K2HFULLMAP:             on,
        K2HMASKBIT:             4,
        K2HCMASKBIT:            4,
        K2HMAXELE:              2
    }

#
# CHMPX SERVER NODES SECTION
#
SVRNODE:
    [
        {
            NAME:               foo[0-9].yahoo.co.jp,
            PORT:               8020,
            CTLPORT:            8021,
            SSL:                no
        }
    ]

#
# CHMPX SLAVE NODES SECTION
#
SLVNODE:
    [
        {
            NAME:               "[.]*",
            CTLPORT:            8022
        }
    ]

#
# K2HTPDTOR(K2HASH TRANSACTION PLUGIN)
#
K2HTPDTOR:
    {
        K2HTPDTOR_BROADCAST:    no,
    }

#
# K2HFTFUSE
#
# K2HTYPE       type of k2hash used by transfer mode
# K2HFILE       file path of k2hash used by transfer mode, when file type
# K2HFULLMAP    mapping type of k2hash used by transfer mode, when file type
# K2HINIT       initializing of k2hash used by transfer mode, when file type
# K2HMASKBIT    init mask bit count of k2hash used by transfer mode, when file type
# K2HCMASKBIT   collision mask bit count of k2hash used by transfer mode, when file type
# K2HMAXELE     maximum element count of k2hash used by transfer mode, when file type
# K2HPAGESIZE   page size of k2hash used by transfer mode, when file type
# DTORTHREADCNT k2hdtor thread count
# DTORCTP       custom k2hdtor plugin file name(or path)
# BINTRANS      transfer as binary data array
# EXPIRE        grant the expiration date to transfer
# TRANSLINECNT  transfer line count limit at one time(default 0)
# TRANSTIMEUP   transfer timeup limit(default 0)
# BYTELIMIT     muxium bytes for one data length(default 0, means no limit)
#
K2HFTFUSE:
    {
        K2HTYPE:                mem,
        K2HMASKBIT:             4,
        K2HCMASKBIT:            4,
        K2HMAXELE:              2,

        #
        # K2HFTFUSE_RULE_DIR( K2HFTFUSE sub rule )
        #
        # TARGET        traget directory path
        # TRUNS         enable/disable flag for transfer
        # OUTPUTFILE    enable/disable flag for put file
        # PLUGIN        plugin program path
        # DEFAULTALL    default rule as DENY or ALLOW(DENY as default)
        # ALLOW         allowed rule, rule is static string or regex. and convert rule when regex.
        # DENY          denied rule, rule is static string or regex. and convert rule when regex.
        #
        K2HFTFUSE_RULE_DIR:
        [
            {
                TARGET:             /,
                TRUNS:              off,
                OUTPUTFILE:         /dev/null,
                DEFAULTALL:         ALLOW
            },
        ],

        #
        # K2HFTFUSE_RULE( K2HFTFUSE sub rule )
        #
        # TARGET        traget file path
        # TRUNS         enable/disable flag for transfer
        # OUTPUTFILE    enable/disable flag for put file
        # PLUGIN        plugin program path
        # DEFAULTALL    default rule as DENY or ALLOW
        # ALLOW         allowed rule, rule is static string or regex. and convert rule when regex.
        # DENY          denied rule, rule is static string or regex. and convert rule when regex.
        #
        K2HFTFUSE_RULE:
        [
            {
                TARGET:             access.log,
                TRUNS:              on,
                OUTPUTFILE:         /dev/null,
                DEFAULTALL:         ALLOW
            },
            {
                TARGET:             error.log,
                TRUNS:              on,
                OUTPUTFILE:         /dev/null,
                DEFAULTALL:         ALLOW
            },
        ]
    }
  • 次にログを集約するホストで利用する設定ファイルとして、server.yaml(YAML形式)ファイルを準備します。
#
# CHMPX GLOBAL SETTING
#
GLOBAL:
    {
        FILEVERSION:            1,
        DATE:                   "Thu, 01 Dec 2016 00:00:00 +0900",
        GROUP:                  K2HFUSETEST,
        MODE:                   SERVER,
        DELIVERMODE:            random,
        MAXCHMPX:               32,
        REPLICA:                0,
        MAXMQSERVER:            8,
        MAXMQCLIENT:            8,
        MQPERATTACH:            1,
        MAXQPERSERVERMQ:        2,
        MAXQPERCLIENTMQ:        8,
        MAXMQPERCLIENT:         4,
        PORT:                   18020,
        CTLPORT:                18021,
        SELFCTLPORT:            18021,
        RWTIMEOUT:              100000,
        RETRYCNT:               1000,
        CONTIMEOUT:             500000,
        MQRWTIMEOUT:            50,
        MQRETRYCNT:             100000,
        DOMERGE:                on,
        SSL:                    no,
        K2HFULLMAP:             on,
        K2HMASKBIT:             4,
        K2HCMASKBIT:            4,
        K2HMAXELE:              2
    }

#
# CHMPX SERVER NODES SECTION
#
SVRNODE:
    [
        {
            NAME:               foo[0-9].yahoo.co.jp,
            SSL:                no
        }
    ]

#
# CHMPX SLAVE NODES SECTION
#
SLVNODE:
    [
        {
            NAME:               "[.]*",
            CTLPORT:            8022
        }
    ]

#
# K2HFTFUSESVR
#
# TYPE              { trans | file | both }
# FILE_BASEDIR      output file base directory
# FILE_UNIFY        if puts one file for all, specify relative file path.(only file type)
# FILE_TIMEFORM     format for time data like strftime function.(only file type) + '%-': ns(decimal), %%=%
# PLUGIN            program file path for plugin
# FORMAT            output format.(only file type)
#                       %H  hostname
#                       %P  thread(process) id
#                       %F  file path
#                       %f  file name
#                       %T  time, formatted by FILE_TIMEFORM
#                       %L  output content data
#                   * If FORMAT is not specified, it means "%L".
#
# TRANSCONF         slave chmpx configuration file path.(only trans type)
# K2HTYPE           type of k2hash used by transfer mode
# K2HFILE           file path of k2hash used by transfer mode, when file type
# K2HFULLMAP        mapping type of k2hash used by transfer mode, when file type
# K2HINIT           initializing of k2hash used by transfer mode, when file type
# K2HMASKBIT        init mask bit count of k2hash used by transfer mode, when file type
# K2HCMASKBIT       collision mask bit count of k2hash used by transfer mode, when file type
# K2HMAXELE         maximum element count of k2hash used by transfer mode, when file type
# K2HPAGESIZE       page size of k2hash used by transfer mode, when file type
# DTORTHREADCNT     k2htpdtor thread count
# DTORCTP           custom transaction plugin
#
K2HFTFUSESVR:
    {
        TYPE:                   file,
        FILE_BASEDIR:           /home/apache/all,
        FILE_UNIFY:             log/unify.log,
        FILE_TIMEFORM:          "%F %T(%s %-)",
        #PLUGIN:                 /usr/local/bin/myprogram,
        FORMAT:                 "%H:%F(%P):%f[%T] %L",
        K2HMASKBIT:             4,
        K2HCMASKBIT:            4,
        K2HMAXELE:              2
    }

起動

  • まず、ログを集約するホストの準備をします。

    • 通信ミドルウエアCHMPXを起動します。
      $ chmpx -conf server.yaml
      起動時にMQ(Posix MQ関連)のエラーがでる場合には、ホストで利用できるMQ数を大きくします。
      (下記で指定しているMQ数は余裕を持った数字です。特権ユーザーにて設定してください。)
      # echo 1024 > /proc/sys/fs/mqueue/msg_max
    • K2HFTFUSESVRプログラムを起動します。
      $ k2hftfusesvr -conf server.yaml
  • 次は、Apacheの動作しているホストでプログラムを起動します。

    • 通信ミドルウエアCHMPXを起動します。
      $ chmpx -conf slave.yaml
    • K2HFTFUSEプログラムを起動し、マウントします。(今回はフォアグラウンドで起動しています)
      $ k2hftfuse /home/apache/logs -o conf=slave.yaml -f
  • マウントできているか確認します。
    • dfコマンドで確認します。
      $ df
      Filesystem      1K-blocks    Used  Available Use% Mounted on
      /dev/****        41152832 1736572   37617420   5% /
      none                    4       0          4   0% /sys/fs/cgroup
      none                 5120       0       5120   0% /run/lock
      tmpfs             2024060       0    2024060   0% /run/shm
      k2hftfuse      1073741824       0 1073741824   0% /home/apache/logs
  • Apacheを起動して、ログを出力させ、集約するホスト上でファイル内容が転送されているか確認してみてください。
    • tailコマンドで確認します。
      $ tail -f /home/apache/all/log/unify.log
  • Apacheを起動していなくても、Apache側(送信側)のホスト上で以下のようにすることで”TEST STRING”が上記のファイルに転送されていることを確認できます。
    • Apacheを使わないで確認します。
      $ echo "TEST STRING" > /home/apache/logs/logs/access.log

テストを終えて

以上のようにして、ファイル内容が転送できていることが確認できます。
K2HFTFUSEの優れている点は、転送元(プログラム)の変更をする必要がなく、設定により集約ホスト、経路を決定でき、確実で高速なファイル/メッセージの転送ができる点です。
また、転送先のホストで任意のプログラムを起動しておき、独自の処理(フィルタリングなど)も簡単にできます。

関連ライブラリ/システム

K2HFTFUSEは、独自のKVSライブラリ K2HASH(NoSQL Key Value Store library)、通信ミドルウエアシステム CHMPX(Consistent Hashing Mq inProcess data eXchange)、ロックライブラリ FULLOCK(Fast User Level LOCK library)、K2HASHライブラリ専用トランザクションプラグイン K2HTPDTOR(K2HASH Distributed Transaction Of Repeater)を利用しています。
これらは、公開されたK2HFTFUSEリポジトリにsubmoduleとして登録されています。
以下に、各プロダクトの簡単な説明をします。

FULLOCK

高速で安全な低レベルロックライブラリです。
プロセス間共有ロックオブジェクトをマルチプロセス/マルチスレッドから簡単に、かつ安全な利用がでます。
K2HFTFUSEを含む他のプロダクトから利用されています。

K2HASH

NoSQL(KVS:Key-Value Store)ライブラリです。
高速、大容量でマルチプロセス/マルチスレッドからの安全な利用ができます。
また独自のトランザクション処理、キューイングなどもできるライブラリです。
CHMPX、K2HFTFUSE、K2HTPDTORなどのプロダクトから利用される基礎ライブラリを提供します。

CHMPX

高速、大容量対応の通信ミドルウエアです。
Consistent Hasingをベースにサーバーノードを構築できる高速なネットワーク越しのプロセス間を提供します。
IPC over RPCの機能を提供します。

K2HTPDTOR

K2HASHライブラリのためのK2HASHトランザクションプラグインプログラム(シェアードライブラリ)です。
K2HASHデータの操作をトランザクションとして処理し、転送、保管、加工などができます。

K2HFTFUSEは、FUSE(Filesystem in Userspace)によるユーザースペースでのマウント機能を利用し、これを基本構成としていますが、主要機能は上述の各プロダクトにより実現できてます。

最後に

今後もK2HFTFUSEの改良などを継続していく予定です。
fluentdよりも重厚で、かつKafkaよりも簡単にデータ集約ができるシステムです。ぜひK2HFTFUSEをお試しください。
また、今後もK2HFTFUSEについての技術情報、関連プロダクトについての技術情報も公開していきたいと思います。

Yahoo! JAPANでは情報技術を駆使して人々や社会の課題を一緒に解決していける方を募集しています。詳しくは採用情報をご覧ください。

  • このエントリーをはてなブックマークに追加