ヤフー株式会社は、2023年10月1日にLINEヤフー株式会社になりました。LINEヤフー株式会社の新しいブログはこちらです。LINEヤフー Tech Blog

テクノロジー

Hadoopを使いこなす(2)

こんにちは、 前回のHadoopの記事 に引き続き、MapReduceのカスタマイズポイントを解説していきます。

前回の記事の図や、表などを参照しながら読み進めていただければと思います。

MapperやReducerの流れの制御

Mapperの実行の流れは、デフォルトでは、初期化処理を行った後、map関数を繰り返し実行し、終了処理を行うようになっていますが、この流れ自体を制御することができます。

古いAPIでは、MapRunnerを通じてこの流れを制御できますが、0.20.0からの新しいAPIでは単純にMapperクラスのrun関数をオーバーライドすることで、行えます。
デフォルトのrun関数は以下の通りです。

public  void run(Context context) throws  IOException, InterruptedException {
  setup(context);   // 初期化処理
   while  (context.nextKeyValue()) {
    map(context.getCurrentKey(), context.getCurrentValue(), context);  // map関数を繰り返す
   }
  cleanup(context);  // 終了処理
 }

初期化処理を行い、map関数を繰り返し実行し、終了処理する流れがそのまま書かれています。

例えば、WEBページのクロールなど、一回のmap関数の実行に時間がかかる処理の場合、map関数をさらに複数スレッドで実行した方が効率的です。
そうした場合に、このrun関数を活用できます。

なお、Reducerの流れの制御も同様にrun関数を上書きすることで行えますが、新しいAPIでしか行えず、古いAPIではMapRunnerに相当するものがありません。

Partitioner

Partitionerはmap関数の出力を、Partitionに分割し、それぞれをReduceTaskに割り当てます。
デフォルトのPartitionerはHashPartitionerで、Keyのハッシュ値にもとづいて分割します。

public  class HashPartitioner<K2, V2> implements  Partitioner<K2, V2> {
  public  void configure(JobConf job) {}
  public  int  getPartition(K2 key, V2 value,
                          int  numReduceTasks) {
    return  (key.hashCode() & Integer .MAX_VALUE) % numReduceTasks;  // Keyのハッシュ値をReduceTaskの数で割った余り
   }
}

getPartition関数は、Keyのハッシュ値をReduceTaskの数で割った余りを返し、Partitionを決定します。
つまり、同じKeyは必ず同じReduceTaskで処理されることになります。

Partitionerをカスタマイズする例をあげてみます。

map関数の出力Keyが、ファイル名だったとして、同じ拡張子のファイルは、同じReduceTaskで処理したいとしましょう。
(エラー処理などは省略します)

public  int  getPartition(Text key, Text value,
                        int  numReduceTasks) {
  String  extension = key.toString().replaceAll("^.*?\\." ,"");  // ファイル名から拡張子を抽出
   return  ((new  Text(extension)).hashCode() & Integer .MAX_VALUE) % numReduceTasks;
}

このようにKeyのファイル名から拡張子を抽出し、拡張子のハッシュ値をもとに分割すれば、
同じ拡張子のファイルは必ず同じReduceTaskで処理されるというわけです。

その後のReducerでは、同じ拡張子のファイルの長さの合計を計算する、といった処理が考えられますが、それには、もう一工夫必要です。
reduce関数に渡される、Valueのグルーピングを拡張子ごとに行う必要がありますが、これは後ほど解説します。

Combiner

Combinerは、Mapの出力をさらに集計する処理のことでMapTask内で行われます。Combinerの新たな出力が、Reducerの入力として渡されます。

例えば、 初回のHadoopの記事 でも解説したWordCountの例をあげてみましょう。
Combinerを使わないと以下のようになります。

map関数の出力のKeyに単語の名前、Valueには1がセットされ、map関数の出力がそのままreduce関数に渡されます。
reduce関数では単語の数がカウントされて、<manmos,8> と単語の数の合計を出力します。

では、combinerを使うとどうなるでしょうか?

map関数の出力は、MapTask内でCombinerに渡されます。
Combinerでは、そのMapTask内での単語の数の合計を出力し、reduce関数に渡します。
reduce関数では、さらにそれらを足し合わせて、全体の単語の数の合計を出力するというわけです。

Combinerは、ReduceTaskへの転送量を減らし、Reducerの計算量を減らすことで、全体の高速化を図る目的で使われます。

なお、Combinerはオプションであり、必ずしも使うべきではないケースも多いので注意して下さい。

DistributedCache

DistributedCacheは、処理に必要なファイルを各マシンにコピーしてキャッシュする仕組みです。
配布は、Taskが実行される前に、一回だけ行われます。
キャッシュしたファイルは、Mapper、Reducerから自由に利用することができますが、読み込み専用で、変更してはいけません。

単にファイルをキャッシュするだけではなく、zip/tar/tgz/tar.gzなどの圧縮されたファイルは展開してキャッシュされます。

また、jarファイルを配布すると、自動でクラスパスに追加してくれ、MapperやReducerで使うことができます。
.soなどのネイティブライブラリも同様に配布できます。

例えば、jarファイルを配布する例は以下の通りです。

DistributedCache.addFileToClassPath(new  Path("test.jar" ), job);  // jobはJobConfファイル 

なお、MapperやReducerで共通で扱いたい文字列などは、JobConfに直接定義してしまうのが便利です。

job.set("my.string.conf" , "test" );

MapやReduceの初期化処理などで、以下のようにしてJobConfに定義した文字列を取得できます。

public  static  class Map extends  ... {
  private  String  test;

  public  void configure(JobConf job) {
    // JobConfに設定したデータを読み込む
     test = job.get("my.string.conf" );  // "test" が入っている
   }

  public  void map(...) throws  IOException {
    // 何か処理
   }
}

Comparator

Comparatorは、Keyのソートや、ReducerでのValueのグルーピングをカスタマイズできます。
Key同士をどう比較するかを定義します。

Keyのソートや、Valueのグルーピングに用いる、RawComparatorのインターフェイスは以下です。

public  interface  RawComparator<T> extends  Comparator<T> {
  public  int  compare(byte [] b1, int  s1, int  l1, byte [] b2, int  s2, int  l2);
}

Keyのソートや、Valueのグルーピングでは、このcompare関数が呼ばれてバイト単位で比較されることになります。
RawComparatorのデフォルトの実装は、WritableComparatorです。

private  final  WritableComparable key1;
private  final  WritableComparable key2;

public  int  compare(byte [] b1, int  s1, int  l1, byte [] b2, int  s2, int  l2) {
  try  {
    buffer.reset(b1, s1, l1);  // key1を抽出
     key1.readFields(buffer);
    buffer.reset(b2, s2, l2);  // key2を抽出
     key2.readFields(buffer);

  } catch  (IOException e) {
    throw  new  RuntimeException(e);
  }
  return  compare(key1, key2);  // 後のcompare関数を呼び出す
 }

public  int  compare(WritableComparable a, WritableComparable b) {
  return  a.compareTo(b);
}

WritableComparatorでは、バイト列からKeyを抽出し、そのKeyのクラスのcompareTo関数を用いて比較するようになっています。

カスタマイズしたComparatorを、Valueのグルーピングに用いる例をあげてみます。

先ほどのPartitionerのときの例では、同じ拡張子のファイルを同じReduceTaskに割り当てる処理を行いました。
その後のReducerでは拡張子ごとにファイルのバイト数の合計などを求める、などの処理が考えられますが、その場合、拡張子ごとにValueをグルーピングする必要があります。

拡張子ごとにグルーピングするComparatorクラスは以下の通りです。
なおKeyにはファイル名がセットされています。

public  static  class FileExtensionComparator extends  WritableComparator {
  protected  FileExtensionComparator() {
    super (Text.class, true );
  }
  public  int  compare(WritableComparable w1, WritableComparable w2) {
    Text t1 = (Text) w1;
    Text t2 = (Text) w2;
    return  getExtension(t1).compareTo(getExtension(t2));
  }
  private  Text getExtension(Text t){
    return  new  Text( t.toString().replaceAll("^.*?\\." ,"") );  // ファイル名から拡張子を抽出
   }
}

まず比較したいKeyのクラス(Text)を指定して、WritableComparatorのコンストラクタを呼び出します。
比較する際にcompare関数が呼び出されるので、ファイル名から拡張子を抽出して、TextのcompareToメソッドを呼び出しています。
このクラスをJobConfのsetOutputValueGroupingComparator関数で指定すれば、拡張子ごとにグルーピングされるというわけです。

なお、実はこの方法だと、一度Keyのバイト列をデシリアイズしてから比較しているので、低速です。
MapReduceでは、この比較のスピードは全体の処理時間に大きく影響します。
例えば、TextのデフォルトのComparatorでは、バイト列をそのまま比較する最適化を行っています。

public  static  class Comparator extends  WritableComparator {
  public  Comparator() {
    super (Text.class);
  }
  public  int  compare(byte [] b1, int  s1, int  l1,
                     byte [] b2, int  s2, int  l2) {
    int  n1 = WritableUtils.decodeVIntSize(b1[s1]);  // ヘッダのバイト数を求める
     int  n2 = WritableUtils.decodeVIntSize(b2[s2]);  // ヘッダのバイト数を求める
     return  compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);  // ヘッダのバイト数を除外して、バイト列のまま比較
   }
}

WritableComparatorのcompareBytesクラスを呼び出し、バイト列のまま比較を行っています。
なお、Textは、文字列のバイトの長さを、先頭に可変長で格納しています。
そこで、WritableUtilsクラスのdecodeVIntSize関数で、長さの情報を格納しているヘッダ自体のバイト数を求めて、そのヘッダを除外して比較を行っています。

このComparatorをTextのデフォルトのComparatorとして登録するには、

static  {
  WritableComparator.define(Text.class, new  Comparator());
}

のようにstaticブロックで、WritableComparatorのdefine関数を呼び出します。

OutputFormat

OutputFormatのインターフェイスは以下のようになっています。

public  interface  OutputFormat<K, V> {
  RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
                                     String  name, Progressable progress)
  throws  IOException;
  void checkOutputSpecs(FileSystem ignored, JobConf job) throws  IOException;
}

InputFormatと同様に、getRecordWriter関数で、KeyとValueを受け取って実際にデータを出力するRecordWriterクラスを返します。
RecordWriterのインターフェイスは以下です。

public  interface  RecordWriter<K, V> {
  void write(K key, V value) throws  IOException;
  void close(Reporter reporter) throws  IOException;
}

write関数が繰り返し呼ばれて、データが書き込まれます。

例えば、Key/ValueをXMLに変換して書き出したい、HDFSではなくDBに書き込みたい、などの場合にOutputFormatのカスタマイズは有効です。

なお、複数のファイル、場所にMapReduceの出力を書き込める、 MultipleOutputFormat や、 MultipleOutputs という便利な仕組みも用意されています。

Writable

WritableクラスはMapReduceのKeyやValueとして使われるクラスです。
例えば、テキストにはTextクラス、intにはIntWritableといった具合にWritableクラスが用意されています。

Writableクラスは、マシン間で転送可能なバイトストリームへのシリアライズ、デシリアライズを行います。
Javaで用意されているものより、コンパクトで高速に処理できるのが特徴です。

Writableクラスのインターフェイスは以下の通りです。

public  interface  Writable {
  void readFields(DataInput in);
  void write(DataOutput out);
}

readFields関数はバイトストリームのDataInputを読み取り、デシリアイズを行います。
また、write関数は、シリアイズを行い、バイトストリームのDataOutputに書き出します。

例えば、緯度経度を扱うWritableクラスは以下の通りです。
MapReduceのKeyとして扱うためには、比較も可能であるようにWritableComparableインターフェイスを実装します。

public  class LatLon implements  WritableComparable {
  public  double  lat;
  public  double  lon;
  public  LatLonDistance latLonDistance;
  public  double  baseLat=35.681382;  // 東京駅の緯度
  public  double  baseLon=139.766084;  // 東京駅の経度
 
  public  LatLon(double  lat, double  lon) {
    this .lat = lat;
    this .lon = lon;
    latLonDistance = new  LatLonDistance(baseLat,baseLon);
  }

  public  LatLon() {
    this (0.0f, 0.0f);
  }

  // シリアライズして書き出し
   public  void write(DataOutput out) throws  IOException {
    out.writeDouble(lat);
    out.writeDouble(lon);
  }

  // デシリアライズして読み込み
   public  void readFields(DataInput in) throws  IOException {    
    lat = in.readDouble();
    lon = in.readDouble();
  }

  public  String  toString() {
    return  Double .toString(lat) + ", "  + Double .toString(lon);
  }

  // 比較
   public  int  compareTo(LatLon other) {
    double  myDistance = latLonDistance.calc(lat,lon);  // 東京駅からの距離
     double  otherDistance = latLonDistance.calc(other.lat,other.lon);  // 東京駅からの距離
     return  -Double .compare(myDistance, otherDistance);  // 距離の逆順で比較(東京駅からの距離が近い方が大きい値)
   }

  public  boolean  equals(Object  o) {
    if  (!(o instanceof  LatLon)) return  false ;
    LatLon other = (LatLon) o;
    return  this .lat == other.lat && this .lon == other.lon;
  }
  
  // Partitionerなどに使うHash値を求める
   public  int  hashCode() {
    return  (int ) (Double .doubleToLongBits(lat) ^ Double .doubleToLongBits(lon));
  }
}

write関数では、DataOutputのwriteDouble関数を呼び出しで、バイトストリームへの書き出しを行っています。
また、readFields関数では、DataInputのreadDouble関数でバイトストリームから読みだし、緯度経度を設定しています。

比較は、東京駅からの距離の逆順で行うようになっています。(東京駅からの距離が近い方が大きい値)
コンストラクタで、東京駅の緯度経度を指定してLatLonDistanceクラスを初期化し、compareTo関数内で、東京駅からの距離を計算しています。
なお、高速化のためには前述したComparatorを実装する必要があります。

また、Partitionerで呼び出される、hashCode関数なども必要です。

次回は

以上でカスタマイズポイントの解説は終わりです。
次回は、実際のサービスでの活用事例をあげ、今までに解説したカスタマイズポイントがどういじられているかを紹介する予定です。

P.S.
Hadoop Hack Night もよろしくお願いします!

(R&D統括本部 吉田一星)

追記
Hadoopを使いこなす(1)
Hadoopを使いこなす(3)

こちらの記事のご感想を聞かせください。

  • 学びがある
  • わかりやすい
  • 新しい視点

ご感想ありがとうございました

このページの先頭へ