001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.util.Base64; 021import org.apache.hadoop.conf.Configurable; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 024import org.apache.hadoop.hbase.util.Bytes; 025import org.apache.hadoop.mapreduce.Partitioner; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * A partitioner that takes start and end keys and uses bigdecimal to figure which reduce a key 032 * belongs to. Pass the start and end keys in the Configuration using 033 * <code>hbase.simpletotalorder.start</code> and <code>hbase.simpletotalorder.end</code>. The end 034 * key needs to be exclusive; i.e. one larger than the biggest key in your key space. You may be 035 * surprised at how this class partitions the space; it may not align with preconceptions; e.g. a 036 * start key of zero and an end key of 100 divided in ten will not make regions whose range is 0-10, 037 * 10-20, and so on. Make your own partitioner if you need the region spacing to come out a 038 * particular way. 039 * @param <VALUE> 040 * @see #START 041 * @see #END 042 */ 043@InterfaceAudience.Public 044public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE> 045 implements Configurable { 046 private final static Logger LOG = LoggerFactory.getLogger(SimpleTotalOrderPartitioner.class); 047 048 /** 049 * @deprecated since 0.90.0 050 * @see <a href="https://issues.apache.org/jira/browse/HBASE-1923">HBASE-1923</a> 051 */ 052 @Deprecated 053 public static final String START = "hbase.simpletotalorder.start"; 054 055 /** 056 * @deprecated since 0.90.0 057 * @see <a href="https://issues.apache.org/jira/browse/HBASE-1923">HBASE-1923</a> 058 */ 059 @Deprecated 060 public static final String END = "hbase.simpletotalorder.end"; 061 062 static final String START_BASE64 = "hbase.simpletotalorder.start.base64"; 063 static final String END_BASE64 = "hbase.simpletotalorder.end.base64"; 064 065 private Configuration c; 066 private byte[] startkey; 067 private byte[] endkey; 068 private byte[][] splits; 069 private int lastReduces = -1; 070 071 public static void setStartKey(Configuration conf, byte[] startKey) { 072 conf.set(START_BASE64, Bytes.toString(Base64.getEncoder().encode(startKey))); 073 } 074 075 public static void setEndKey(Configuration conf, byte[] endKey) { 076 conf.set(END_BASE64, Bytes.toString(Base64.getEncoder().encode(endKey))); 077 } 078 079 @SuppressWarnings("deprecation") 080 static byte[] getStartKey(Configuration conf) { 081 return getKeyFromConf(conf, START_BASE64, START); 082 } 083 084 @SuppressWarnings("deprecation") 085 static byte[] getEndKey(Configuration conf) { 086 return getKeyFromConf(conf, END_BASE64, END); 087 } 088 089 private static byte[] getKeyFromConf(Configuration conf, String base64Key, String deprecatedKey) { 090 String encoded = conf.get(base64Key); 091 if (encoded != null) { 092 return Base64.getDecoder().decode(encoded); 093 } 094 String oldStyleVal = conf.get(deprecatedKey); 095 if (oldStyleVal == null) { 096 return null; 097 } 098 LOG.warn("Using deprecated configuration " + deprecatedKey 099 + " - please use static accessor methods instead."); 100 return Bytes.toBytesBinary(oldStyleVal); 101 } 102 103 @Override 104 public int getPartition(final ImmutableBytesWritable key, final VALUE value, final int reduces) { 105 if (reduces == 1) return 0; 106 if (this.lastReduces != reduces) { 107 this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1); 108 for (int i = 0; i < splits.length; i++) { 109 LOG.info(Bytes.toStringBinary(splits[i])); 110 } 111 this.lastReduces = reduces; 112 } 113 int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(), key.getLength()); 114 // Below code is from hfile index search. 115 if (pos < 0) { 116 pos++; 117 pos *= -1; 118 if (pos == 0) { 119 // falls before the beginning of the file. 120 throw new RuntimeException("Key outside start/stop range: " + key.toString()); 121 } 122 pos--; 123 } 124 return pos; 125 } 126 127 @Override 128 public Configuration getConf() { 129 return this.c; 130 } 131 132 @Override 133 public void setConf(Configuration conf) { 134 this.c = conf; 135 this.startkey = getStartKey(conf); 136 this.endkey = getEndKey(conf); 137 if (startkey == null || endkey == null) { 138 throw new RuntimeException(this.getClass() + " not configured"); 139 } 140 LOG.info( 141 "startkey=" + Bytes.toStringBinary(startkey) + ", endkey=" + Bytes.toStringBinary(endkey)); 142 // Reset last reduces count on change of Start / End key 143 this.lastReduces = -1; 144 } 145}