001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.util.Base64; 021import org.apache.hadoop.conf.Configurable; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 024import org.apache.hadoop.hbase.util.Bytes; 025import org.apache.hadoop.mapreduce.Partitioner; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * A partitioner that takes start and end keys and uses bigdecimal to figure which reduce a key 032 * belongs to. Pass the start and end keys in the Configuration using {@value #START_BASE64} and 033 * {@value #END_BASE64}. The end key needs to be exclusive; i.e. one larger than the biggest key in 034 * your key space. You may be surprised at how this class partitions the space; it may not align 035 * with preconceptions; e.g. a start key of zero and an end key of 100 divided in ten will not make 036 * regions whose range is 0-10, 10-20, and so on. Make your own partitioner if you need the region 037 * spacing to come out a particular way. 038 * @see #START_BASE64 039 * @see #END_BASE64 040 */ 041@InterfaceAudience.Public 042public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE> 043 implements Configurable { 044 private final static Logger LOG = LoggerFactory.getLogger(SimpleTotalOrderPartitioner.class); 045 046 static final String START_BASE64 = "hbase.simpletotalorder.start.base64"; 047 static final String END_BASE64 = "hbase.simpletotalorder.end.base64"; 048 049 private Configuration c; 050 private byte[] startkey; 051 private byte[] endkey; 052 private byte[][] splits; 053 private int lastReduces = -1; 054 055 public static void setStartKey(Configuration conf, byte[] startKey) { 056 conf.set(START_BASE64, Bytes.toString(Base64.getEncoder().encode(startKey))); 057 } 058 059 public static void setEndKey(Configuration conf, byte[] endKey) { 060 conf.set(END_BASE64, Bytes.toString(Base64.getEncoder().encode(endKey))); 061 } 062 063 static byte[] getStartKey(Configuration conf) { 064 return getBase64KeyFromConf(conf, START_BASE64); 065 } 066 067 static byte[] getEndKey(Configuration conf) { 068 return getBase64KeyFromConf(conf, END_BASE64); 069 } 070 071 private static byte[] getBase64KeyFromConf(Configuration conf, String base64Key) { 072 String encoded = conf.get(base64Key); 073 if (encoded != null) { 074 return Base64.getDecoder().decode(encoded); 075 } 076 return null; 077 } 078 079 @Override 080 public int getPartition(final ImmutableBytesWritable key, final VALUE value, final int reduces) { 081 if (reduces == 1) return 0; 082 if (this.lastReduces != reduces) { 083 this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1); 084 for (int i = 0; i < splits.length; i++) { 085 LOG.info(Bytes.toStringBinary(splits[i])); 086 } 087 this.lastReduces = reduces; 088 } 089 int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(), key.getLength()); 090 // Below code is from hfile index search. 091 if (pos < 0) { 092 pos++; 093 pos *= -1; 094 if (pos == 0) { 095 // falls before the beginning of the file. 096 throw new RuntimeException("Key outside start/stop range: " + key.toString()); 097 } 098 pos--; 099 } 100 return pos; 101 } 102 103 @Override 104 public Configuration getConf() { 105 return this.c; 106 } 107 108 @Override 109 public void setConf(Configuration conf) { 110 this.c = conf; 111 this.startkey = getStartKey(conf); 112 this.endkey = getEndKey(conf); 113 if (startkey == null || endkey == null) { 114 throw new RuntimeException(this.getClass() + " not configured"); 115 } 116 LOG.info( 117 "startkey=" + Bytes.toStringBinary(startkey) + ", endkey=" + Bytes.toStringBinary(endkey)); 118 // Reset last reduces count on change of Start / End key 119 this.lastReduces = -1; 120 } 121}