001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import org.apache.hadoop.conf.Configurable; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.HBaseConfiguration; 024import org.apache.hadoop.hbase.TableName; 025import org.apache.hadoop.hbase.client.Connection; 026import org.apache.hadoop.hbase.client.ConnectionFactory; 027import org.apache.hadoop.hbase.client.RegionLocator; 028import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 029import org.apache.hadoop.hbase.mapred.TableOutputFormat; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.hadoop.mapreduce.Partitioner; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * This is used to partition the output keys into groups of keys. Keys are grouped according to the 038 * regions that currently exist so that each reducer fills a single region so load is distributed. 039 * <p> 040 * This class is not suitable as partitioner creating hfiles for incremental bulk loads as region 041 * spread will likely change between time of hfile creation and load time. See 042 * {@link org.apache.hadoop.hbase.tool.BulkLoadHFiles} and 043 * <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>. 044 * </p> 045 * @param <KEY> The type of the key. 046 * @param <VALUE> The type of the value. 047 */ 048@InterfaceAudience.Public 049public class HRegionPartitioner<KEY, VALUE> extends Partitioner<ImmutableBytesWritable, VALUE> 050 implements Configurable { 051 052 private static final Logger LOG = LoggerFactory.getLogger(HRegionPartitioner.class); 053 private Configuration conf = null; 054 // Connection and locator are not cleaned up; they just die when partitioner is done. 055 private Connection connection; 056 private RegionLocator locator; 057 private byte[][] startKeys; 058 059 /** 060 * Gets the partition number for a given key (hence record) given the total number of partitions 061 * i.e. number of reduce-tasks for the job. 062 * <p> 063 * Typically a hash function on a all or a subset of the key. 064 * </p> 065 * @param key The key to be partitioned. 066 * @param value The entry value. 067 * @param numPartitions The total number of partitions. 068 * @return The partition number for the <code>key</code>. 069 * @see org.apache.hadoop.mapreduce.Partitioner#getPartition( java.lang.Object, java.lang.Object, 070 * int) 071 */ 072 @Override 073 public int getPartition(ImmutableBytesWritable key, VALUE value, int numPartitions) { 074 byte[] region = null; 075 // Only one region return 0 076 if (this.startKeys.length == 1) { 077 return 0; 078 } 079 try { 080 // Not sure if this is cached after a split so we could have problems 081 // here if a region splits while mapping 082 region = this.locator.getRegionLocation(key.get()).getRegion().getStartKey(); 083 } catch (IOException e) { 084 LOG.error(e.toString(), e); 085 } 086 for (int i = 0; i < this.startKeys.length; i++) { 087 if (Bytes.compareTo(region, this.startKeys[i]) == 0) { 088 if (i >= numPartitions) { 089 // cover if we have less reduces then regions. 090 return (Integer.toString(i).hashCode() & Integer.MAX_VALUE) % numPartitions; 091 } 092 return i; 093 } 094 } 095 // if above fails to find start key that match we need to return something 096 return 0; 097 } 098 099 /** 100 * Returns the current configuration. 101 * @return The current configuration. 102 * @see org.apache.hadoop.conf.Configurable#getConf() 103 */ 104 @Override 105 public Configuration getConf() { 106 return conf; 107 } 108 109 /** 110 * Sets the configuration. This is used to determine the start keys for the given table. 111 * @param configuration The configuration to set. 112 * @see org.apache.hadoop.conf.Configurable#setConf( org.apache.hadoop.conf.Configuration) 113 */ 114 @Override 115 public void setConf(Configuration configuration) { 116 this.conf = HBaseConfiguration.create(configuration); 117 try { 118 this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(conf)); 119 TableName tableName = TableName.valueOf(conf.get(TableOutputFormat.OUTPUT_TABLE)); 120 this.locator = this.connection.getRegionLocator(tableName); 121 } catch (IOException e) { 122 LOG.error(e.toString(), e); 123 } 124 try { 125 this.startKeys = this.locator.getStartKeys(); 126 } catch (IOException e) { 127 LOG.error(e.toString(), e); 128 } 129 } 130}