View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.util.TreeSet;
22  
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.classification.InterfaceStability;
25  import org.apache.hadoop.hbase.KeyValue;
26  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
27  import org.apache.hadoop.mapreduce.Reducer;
28  
29  /**
30   * Emits sorted KeyValues.
31   * Reads in all KeyValues from passed Iterator, sorts them, then emits
32   * KeyValues in sorted order.  If lots of columns per row, it will use lots of
33   * memory sorting.
34   * @see HFileOutputFormat
35   */
36  @InterfaceAudience.Public
37  @InterfaceStability.Stable
38  public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
39    protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
40        org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
41    throws java.io.IOException, InterruptedException {
42      TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
43      for (KeyValue kv: kvs) {
44        try {
45          map.add(kv.clone());
46        } catch (CloneNotSupportedException e) {
47          throw new java.io.IOException(e);
48        }
49      }
50      context.setStatus("Read " + map.getClass());
51      int index = 0;
52      for (KeyValue kv: map) {
53        context.write(row, kv);
54        if (++index % 100 == 0) context.setStatus("Wrote " + index);
55      }
56    }
57  }