001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapred; 020 021import java.io.IOException; 022import org.apache.hadoop.hbase.HBaseConfiguration; 023import org.apache.hadoop.hbase.TableName; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.client.ConnectionFactory; 029import org.apache.hadoop.hbase.client.RegionLocator; 030import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.mapred.JobConf; 033import org.apache.hadoop.mapred.Partitioner; 034 035/** 036 * This is used to partition the output keys into groups of keys. 037 * Keys are grouped according to the regions that currently exist 038 * so that each reducer fills a single region so load is distributed. 039 * 040 * @param <K2> 041 * @param <V2> 042 */ 043@InterfaceAudience.Public 044public class HRegionPartitioner<K2,V2> 045implements Partitioner<ImmutableBytesWritable, V2> { 046 private static final Logger LOG = LoggerFactory.getLogger(HRegionPartitioner.class); 047 // Connection and locator are not cleaned up; they just die when partitioner is done. 048 private Connection connection; 049 private RegionLocator locator; 050 private byte[][] startKeys; 051 052 @Override 053 public void configure(JobConf job) { 054 try { 055 this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); 056 TableName tableName = TableName.valueOf(job.get(TableOutputFormat.OUTPUT_TABLE)); 057 this.locator = this.connection.getRegionLocator(tableName); 058 } catch (IOException e) { 059 LOG.error(e.toString(), e); 060 } 061 062 try { 063 this.startKeys = this.locator.getStartKeys(); 064 } catch (IOException e) { 065 LOG.error(e.toString(), e); 066 } 067 } 068 069 @Override 070 public int getPartition(ImmutableBytesWritable key, V2 value, int numPartitions) { 071 byte[] region = null; 072 // Only one region return 0 073 if (this.startKeys.length == 1){ 074 return 0; 075 } 076 try { 077 // Not sure if this is cached after a split so we could have problems 078 // here if a region splits while mapping 079 region = locator.getRegionLocation(key.get()).getRegionInfo().getStartKey(); 080 } catch (IOException e) { 081 LOG.error(e.toString(), e); 082 } 083 for (int i = 0; i < this.startKeys.length; i++){ 084 if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){ 085 if (i >= numPartitions){ 086 // cover if we have less reduces then regions. 087 return (Integer.toString(i).hashCode() 088 & Integer.MAX_VALUE) % numPartitions; 089 } 090 return i; 091 } 092 } 093 // if above fails to find start key that match we need to return something 094 return 0; 095 } 096}