001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.util.Base64;
021import org.apache.hadoop.conf.Configurable;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
024import org.apache.hadoop.hbase.util.Bytes;
025import org.apache.hadoop.mapreduce.Partitioner;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * A partitioner that takes start and end keys and uses bigdecimal to figure which reduce a key
032 * belongs to. Pass the start and end keys in the Configuration using {@value #START_BASE64} and
033 * {@value #END_BASE64}. The end key needs to be exclusive; i.e. one larger than the biggest key in
034 * your key space. You may be surprised at how this class partitions the space; it may not align
035 * with preconceptions; e.g. a start key of zero and an end key of 100 divided in ten will not make
036 * regions whose range is 0-10, 10-20, and so on. Make your own partitioner if you need the region
037 * spacing to come out a particular way.
038 * @see #START_BASE64
039 * @see #END_BASE64
040 */
041@InterfaceAudience.Public
042public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
043  implements Configurable {
044  private final static Logger LOG = LoggerFactory.getLogger(SimpleTotalOrderPartitioner.class);
045
046  static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
047  static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
048
049  private Configuration c;
050  private byte[] startkey;
051  private byte[] endkey;
052  private byte[][] splits;
053  private int lastReduces = -1;
054
055  public static void setStartKey(Configuration conf, byte[] startKey) {
056    conf.set(START_BASE64, Bytes.toString(Base64.getEncoder().encode(startKey)));
057  }
058
059  public static void setEndKey(Configuration conf, byte[] endKey) {
060    conf.set(END_BASE64, Bytes.toString(Base64.getEncoder().encode(endKey)));
061  }
062
063  static byte[] getStartKey(Configuration conf) {
064    return getBase64KeyFromConf(conf, START_BASE64);
065  }
066
067  static byte[] getEndKey(Configuration conf) {
068    return getBase64KeyFromConf(conf, END_BASE64);
069  }
070
071  private static byte[] getBase64KeyFromConf(Configuration conf, String base64Key) {
072    String encoded = conf.get(base64Key);
073    if (encoded != null) {
074      return Base64.getDecoder().decode(encoded);
075    }
076    return null;
077  }
078
079  @Override
080  public int getPartition(final ImmutableBytesWritable key, final VALUE value, final int reduces) {
081    if (reduces == 1) return 0;
082    if (this.lastReduces != reduces) {
083      this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
084      for (int i = 0; i < splits.length; i++) {
085        LOG.info(Bytes.toStringBinary(splits[i]));
086      }
087      this.lastReduces = reduces;
088    }
089    int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(), key.getLength());
090    // Below code is from hfile index search.
091    if (pos < 0) {
092      pos++;
093      pos *= -1;
094      if (pos == 0) {
095        // falls before the beginning of the file.
096        throw new RuntimeException("Key outside start/stop range: " + key.toString());
097      }
098      pos--;
099    }
100    return pos;
101  }
102
103  @Override
104  public Configuration getConf() {
105    return this.c;
106  }
107
108  @Override
109  public void setConf(Configuration conf) {
110    this.c = conf;
111    this.startkey = getStartKey(conf);
112    this.endkey = getEndKey(conf);
113    if (startkey == null || endkey == null) {
114      throw new RuntimeException(this.getClass() + " not configured");
115    }
116    LOG.info(
117      "startkey=" + Bytes.toStringBinary(startkey) + ", endkey=" + Bytes.toStringBinary(endkey));
118    // Reset last reduces count on change of Start / End key
119    this.lastReduces = -1;
120  }
121}