001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.util.Base64; 022 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026import org.apache.hadoop.conf.Configurable; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.mapreduce.Partitioner; 031 032/** 033 * A partitioner that takes start and end keys and uses bigdecimal to figure 034 * which reduce a key belongs to. Pass the start and end 035 * keys in the Configuration using <code>hbase.simpletotalorder.start</code> 036 * and <code>hbase.simpletotalorder.end</code>. The end key needs to be 037 * exclusive; i.e. one larger than the biggest key in your key space. 038 * You may be surprised at how this class partitions the space; it may not 039 * align with preconceptions; e.g. a start key of zero and an end key of 100 040 * divided in ten will not make regions whose range is 0-10, 10-20, and so on. 041 * Make your own partitioner if you need the region spacing to come out a 042 * particular way. 043 * @param <VALUE> 044 * @see #START 045 * @see #END 046 */ 047@InterfaceAudience.Public 048public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE> 049implements Configurable { 050 private final static Logger LOG = LoggerFactory.getLogger(SimpleTotalOrderPartitioner.class); 051 052 /** 053 * @deprecated since 0.90.0 054 * @see <a href="https://issues.apache.org/jira/browse/HBASE-1923">HBASE-1923</a> 055 */ 056 @Deprecated 057 public static final String START = "hbase.simpletotalorder.start"; 058 059 /** 060 * @deprecated since 0.90.0 061 * @see <a href="https://issues.apache.org/jira/browse/HBASE-1923">HBASE-1923</a> 062 */ 063 @Deprecated 064 public static final String END = "hbase.simpletotalorder.end"; 065 066 static final String START_BASE64 = "hbase.simpletotalorder.start.base64"; 067 static final String END_BASE64 = "hbase.simpletotalorder.end.base64"; 068 069 private Configuration c; 070 private byte [] startkey; 071 private byte [] endkey; 072 private byte [][] splits; 073 private int lastReduces = -1; 074 075 public static void setStartKey(Configuration conf, byte[] startKey) { 076 conf.set(START_BASE64, Bytes.toString(Base64.getEncoder().encode(startKey))); 077 } 078 079 public static void setEndKey(Configuration conf, byte[] endKey) { 080 conf.set(END_BASE64, Bytes.toString(Base64.getEncoder().encode(endKey))); 081 } 082 083 @SuppressWarnings("deprecation") 084 static byte[] getStartKey(Configuration conf) { 085 return getKeyFromConf(conf, START_BASE64, START); 086 } 087 088 @SuppressWarnings("deprecation") 089 static byte[] getEndKey(Configuration conf) { 090 return getKeyFromConf(conf, END_BASE64, END); 091 } 092 093 private static byte[] getKeyFromConf(Configuration conf, 094 String base64Key, String deprecatedKey) { 095 String encoded = conf.get(base64Key); 096 if (encoded != null) { 097 return Base64.getDecoder().decode(encoded); 098 } 099 String oldStyleVal = conf.get(deprecatedKey); 100 if (oldStyleVal == null) { 101 return null; 102 } 103 LOG.warn("Using deprecated configuration " + deprecatedKey + 104 " - please use static accessor methods instead."); 105 return Bytes.toBytesBinary(oldStyleVal); 106 } 107 108 @Override 109 public int getPartition(final ImmutableBytesWritable key, final VALUE value, 110 final int reduces) { 111 if (reduces == 1) return 0; 112 if (this.lastReduces != reduces) { 113 this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1); 114 for (int i = 0; i < splits.length; i++) { 115 LOG.info(Bytes.toStringBinary(splits[i])); 116 } 117 this.lastReduces = reduces; 118 } 119 int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(), 120 key.getLength()); 121 // Below code is from hfile index search. 122 if (pos < 0) { 123 pos++; 124 pos *= -1; 125 if (pos == 0) { 126 // falls before the beginning of the file. 127 throw new RuntimeException("Key outside start/stop range: " + 128 key.toString()); 129 } 130 pos--; 131 } 132 return pos; 133 } 134 135 @Override 136 public Configuration getConf() { 137 return this.c; 138 } 139 140 @Override 141 public void setConf(Configuration conf) { 142 this.c = conf; 143 this.startkey = getStartKey(conf); 144 this.endkey = getEndKey(conf); 145 if (startkey == null || endkey == null) { 146 throw new RuntimeException(this.getClass() + " not configured"); 147 } 148 LOG.info("startkey=" + Bytes.toStringBinary(startkey) + 149 ", endkey=" + Bytes.toStringBinary(endkey)); 150 // Reset last reduces count on change of Start / End key 151 this.lastReduces = -1; 152 } 153}