001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import java.io.IOException;
021import org.apache.hadoop.hbase.HBaseConfiguration;
022import org.apache.hadoop.hbase.TableName;
023import org.apache.hadoop.hbase.client.Connection;
024import org.apache.hadoop.hbase.client.ConnectionFactory;
025import org.apache.hadoop.hbase.client.RegionLocator;
026import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
027import org.apache.hadoop.hbase.util.Bytes;
028import org.apache.hadoop.mapred.JobConf;
029import org.apache.hadoop.mapred.Partitioner;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * This is used to partition the output keys into groups of keys. Keys are grouped according to the
036 * regions that currently exist so that each reducer fills a single region so load is distributed.
037 * @param <K2>
038 * @param <V2>
039 */
040@InterfaceAudience.Public
041public class HRegionPartitioner<K2, V2> implements Partitioner<ImmutableBytesWritable, V2> {
042  private static final Logger LOG = LoggerFactory.getLogger(HRegionPartitioner.class);
043  // Connection and locator are not cleaned up; they just die when partitioner is done.
044  private Connection connection;
045  private RegionLocator locator;
046  private byte[][] startKeys;
047
048  @Override
049  public void configure(JobConf job) {
050    try {
051      this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
052      TableName tableName = TableName.valueOf(job.get(TableOutputFormat.OUTPUT_TABLE));
053      this.locator = this.connection.getRegionLocator(tableName);
054    } catch (IOException e) {
055      LOG.error(e.toString(), e);
056    }
057
058    try {
059      this.startKeys = this.locator.getStartKeys();
060    } catch (IOException e) {
061      LOG.error(e.toString(), e);
062    }
063  }
064
065  @Override
066  public int getPartition(ImmutableBytesWritable key, V2 value, int numPartitions) {
067    byte[] region = null;
068    // Only one region return 0
069    if (this.startKeys.length == 1) {
070      return 0;
071    }
072    try {
073      // Not sure if this is cached after a split so we could have problems
074      // here if a region splits while mapping
075      region = locator.getRegionLocation(key.get()).getRegionInfo().getStartKey();
076    } catch (IOException e) {
077      LOG.error(e.toString(), e);
078    }
079    for (int i = 0; i < this.startKeys.length; i++) {
080      if (Bytes.compareTo(region, this.startKeys[i]) == 0) {
081        if (i >= numPartitions) {
082          // cover if we have less reduces then regions.
083          return (Integer.toString(i).hashCode() & Integer.MAX_VALUE) % numPartitions;
084        }
085        return i;
086      }
087    }
088    // if above fails to find start key that match we need to return something
089    return 0;
090  }
091}