001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configurable;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.HBaseConfiguration;
024import org.apache.hadoop.hbase.TableName;
025import org.apache.hadoop.hbase.client.Connection;
026import org.apache.hadoop.hbase.client.ConnectionFactory;
027import org.apache.hadoop.hbase.client.RegionLocator;
028import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
029import org.apache.hadoop.hbase.mapred.TableOutputFormat;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.hadoop.mapreduce.Partitioner;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * This is used to partition the output keys into groups of keys. Keys are grouped according to the
038 * regions that currently exist so that each reducer fills a single region so load is distributed.
039 * <p>
040 * This class is not suitable as partitioner creating hfiles for incremental bulk loads as region
041 * spread will likely change between time of hfile creation and load time. See
042 * {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles} and
043 * <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.
044 * </p>
045 * @param <KEY>   The type of the key.
046 * @param <VALUE> The type of the value.
047 */
048@InterfaceAudience.Public
049public class HRegionPartitioner<KEY, VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
050  implements Configurable {
051
052  private static final Logger LOG = LoggerFactory.getLogger(HRegionPartitioner.class);
053  private Configuration conf = null;
054  // Connection and locator are not cleaned up; they just die when partitioner is done.
055  private Connection connection;
056  private RegionLocator locator;
057  private byte[][] startKeys;
058
059  /**
060   * Gets the partition number for a given key (hence record) given the total number of partitions
061   * i.e. number of reduce-tasks for the job.
062   * <p>
063   * Typically a hash function on a all or a subset of the key.
064   * </p>
065   * @param key           The key to be partitioned.
066   * @param value         The entry value.
067   * @param numPartitions The total number of partitions.
068   * @return The partition number for the <code>key</code>.
069   * @see org.apache.hadoop.mapreduce.Partitioner#getPartition( java.lang.Object, java.lang.Object,
070   *      int)
071   */
072  @Override
073  public int getPartition(ImmutableBytesWritable key, VALUE value, int numPartitions) {
074    byte[] region = null;
075    // Only one region return 0
076    if (this.startKeys.length == 1) {
077      return 0;
078    }
079    try {
080      // Not sure if this is cached after a split so we could have problems
081      // here if a region splits while mapping
082      region = this.locator.getRegionLocation(key.get()).getRegionInfo().getStartKey();
083    } catch (IOException e) {
084      LOG.error(e.toString(), e);
085    }
086    for (int i = 0; i < this.startKeys.length; i++) {
087      if (Bytes.compareTo(region, this.startKeys[i]) == 0) {
088        if (i >= numPartitions) {
089          // cover if we have less reduces then regions.
090          return (Integer.toString(i).hashCode() & Integer.MAX_VALUE) % numPartitions;
091        }
092        return i;
093      }
094    }
095    // if above fails to find start key that match we need to return something
096    return 0;
097  }
098
099  /**
100   * Returns the current configuration.
101   * @return The current configuration.
102   * @see org.apache.hadoop.conf.Configurable#getConf()
103   */
104  @Override
105  public Configuration getConf() {
106    return conf;
107  }
108
109  /**
110   * Sets the configuration. This is used to determine the start keys for the given table.
111   * @param configuration The configuration to set.
112   * @see org.apache.hadoop.conf.Configurable#setConf( org.apache.hadoop.conf.Configuration)
113   */
114  @Override
115  public void setConf(Configuration configuration) {
116    this.conf = HBaseConfiguration.create(configuration);
117    try {
118      this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(conf));
119      TableName tableName = TableName.valueOf(conf.get(TableOutputFormat.OUTPUT_TABLE));
120      this.locator = this.connection.getRegionLocator(tableName);
121    } catch (IOException e) {
122      LOG.error(e.toString(), e);
123    }
124    try {
125      this.startKeys = this.locator.getStartKeys();
126    } catch (IOException e) {
127      LOG.error(e.toString(), e);
128    }
129  }
130}