001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.IOException;
022import org.apache.yetus.audience.InterfaceAudience;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025import org.apache.hadoop.conf.Configurable;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseConfiguration;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.ConnectionFactory;
031import org.apache.hadoop.hbase.client.RegionLocator;
032import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
033import org.apache.hadoop.hbase.mapred.TableOutputFormat;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.mapreduce.Partitioner;
036
037/**
038 * This is used to partition the output keys into groups of keys.
039 * Keys are grouped according to the regions that currently exist
040 * so that each reducer fills a single region so load is distributed.
041 *
042 * <p>This class is not suitable as partitioner creating hfiles
043 * for incremental bulk loads as region spread will likely change between time of
044 * hfile creation and load time. See {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles}
045 * and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.</p>
046 *
047 * @param <KEY>  The type of the key.
048 * @param <VALUE>  The type of the value.
049 */
050@InterfaceAudience.Public
051public class HRegionPartitioner<KEY, VALUE>
052extends Partitioner<ImmutableBytesWritable, VALUE>
053implements Configurable {
054
055  private static final Logger LOG = LoggerFactory.getLogger(HRegionPartitioner.class);
056  private Configuration conf = null;
057  // Connection and locator are not cleaned up; they just die when partitioner is done.
058  private Connection connection;
059  private RegionLocator locator;
060  private byte[][] startKeys;
061
062  /**
063   * Gets the partition number for a given key (hence record) given the total
064   * number of partitions i.e. number of reduce-tasks for the job.
065   *
066   * <p>Typically a hash function on a all or a subset of the key.</p>
067   *
068   * @param key  The key to be partitioned.
069   * @param value  The entry value.
070   * @param numPartitions  The total number of partitions.
071   * @return The partition number for the <code>key</code>.
072   * @see org.apache.hadoop.mapreduce.Partitioner#getPartition(
073   *   java.lang.Object, java.lang.Object, int)
074   */
075  @Override
076  public int getPartition(ImmutableBytesWritable key,
077      VALUE value, int numPartitions) {
078    byte[] region = null;
079    // Only one region return 0
080    if (this.startKeys.length == 1){
081      return 0;
082    }
083    try {
084      // Not sure if this is cached after a split so we could have problems
085      // here if a region splits while mapping
086      region = this.locator.getRegionLocation(key.get()).getRegionInfo().getStartKey();
087    } catch (IOException e) {
088      LOG.error(e.toString(), e);
089    }
090    for (int i = 0; i < this.startKeys.length; i++){
091      if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
092        if (i >= numPartitions-1){
093          // cover if we have less reduces then regions.
094          return (Integer.toString(i).hashCode()
095              & Integer.MAX_VALUE) % numPartitions;
096        }
097        return i;
098      }
099    }
100    // if above fails to find start key that match we need to return something
101    return 0;
102  }
103
104  /**
105   * Returns the current configuration.
106   *
107   * @return The current configuration.
108   * @see org.apache.hadoop.conf.Configurable#getConf()
109   */
110  @Override
111  public Configuration getConf() {
112    return conf;
113  }
114
115  /**
116   * Sets the configuration. This is used to determine the start keys for the
117   * given table.
118   *
119   * @param configuration  The configuration to set.
120   * @see org.apache.hadoop.conf.Configurable#setConf(
121   *   org.apache.hadoop.conf.Configuration)
122   */
123  @Override
124  public void setConf(Configuration configuration) {
125    this.conf = HBaseConfiguration.create(configuration);
126    try {
127      this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(conf));
128      TableName tableName = TableName.valueOf(conf.get(TableOutputFormat.OUTPUT_TABLE));
129      this.locator = this.connection.getRegionLocator(tableName);
130    } catch (IOException e) {
131      LOG.error(e.toString(), e);
132    }
133    try {
134      this.startKeys = this.locator.getStartKeys();
135    } catch (IOException e) {
136      LOG.error(e.toString(), e);
137    }
138  }
139}