View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.HBaseConfiguration;
26  import org.apache.hadoop.hbase.TableName;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.classification.InterfaceStability;
29  import org.apache.hadoop.hbase.client.Connection;
30  import org.apache.hadoop.hbase.client.ConnectionFactory;
31  import org.apache.hadoop.hbase.client.RegionLocator;
32  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.mapred.JobConf;
35  import org.apache.hadoop.mapred.Partitioner;
36  
37  
38  /**
39   * This is used to partition the output keys into groups of keys.
40   * Keys are grouped according to the regions that currently exist
41   * so that each reducer fills a single region so load is distributed.
42   *
43   * @param <K2>
44   * @param <V2>
45   */
46  @InterfaceAudience.Public
47  @InterfaceStability.Stable
48  public class HRegionPartitioner<K2,V2>
49  implements Partitioner<ImmutableBytesWritable, V2> {
50    private static final Log LOG = LogFactory.getLog(HRegionPartitioner.class);
51    // Connection and locator are not cleaned up; they just die when partitioner is done.
52    private Connection connection;
53    private RegionLocator locator;
54    private byte[][] startKeys;
55  
56    public void configure(JobConf job) {
57      try {
58        this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
59        TableName tableName = TableName.valueOf(job.get(TableOutputFormat.OUTPUT_TABLE));
60        this.locator = this.connection.getRegionLocator(tableName);
61      } catch (IOException e) {
62        LOG.error(e);
63      }
64  
65      try {
66        this.startKeys = this.locator.getStartKeys();
67      } catch (IOException e) {
68        LOG.error(e);
69      }
70    }
71  
72    public int getPartition(ImmutableBytesWritable key, V2 value, int numPartitions) {
73      byte[] region = null;
74      // Only one region return 0
75      if (this.startKeys.length == 1){
76        return 0;
77      }
78      try {
79        // Not sure if this is cached after a split so we could have problems
80        // here if a region splits while mapping
81        region = locator.getRegionLocation(key.get()).getRegionInfo().getStartKey();
82      } catch (IOException e) {
83        LOG.error(e);
84      }
85      for (int i = 0; i < this.startKeys.length; i++){
86        if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
87          if (i >= numPartitions-1){
88            // cover if we have less reduces then regions.
89            return (Integer.toString(i).hashCode()
90                & Integer.MAX_VALUE) % numPartitions;
91          }
92          return i;
93        }
94      }
95      // if above fails to find start key that match we need to return something
96      return 0;
97    }
98  }