001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.util.Base64;
022
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026import org.apache.hadoop.conf.Configurable;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.mapreduce.Partitioner;
031
032/**
033 * A partitioner that takes start and end keys and uses bigdecimal to figure
034 * which reduce a key belongs to.  Pass the start and end
035 * keys in the Configuration using <code>hbase.simpletotalorder.start</code>
036 * and <code>hbase.simpletotalorder.end</code>.  The end key needs to be
037 * exclusive; i.e. one larger than the biggest key in your key space.
038 * You may be surprised at how this class partitions the space; it may not
039 * align with preconceptions; e.g. a start key of zero and an end key of 100
040 * divided in ten will not make regions whose range is 0-10, 10-20, and so on.
041 * Make your own partitioner if you need the region spacing to come out a
042 * particular way.
043 * @param <VALUE>
044 * @see #START
045 * @see #END
046 */
047@InterfaceAudience.Public
048public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
049implements Configurable {
050  private final static Logger LOG = LoggerFactory.getLogger(SimpleTotalOrderPartitioner.class);
051
052  /**
053   * @deprecated since 0.90.0
054   * @see <a href="https://issues.apache.org/jira/browse/HBASE-1923">HBASE-1923</a>
055   */
056  @Deprecated
057  public static final String START = "hbase.simpletotalorder.start";
058
059  /**
060   * @deprecated since 0.90.0
061   * @see <a href="https://issues.apache.org/jira/browse/HBASE-1923">HBASE-1923</a>
062   */
063  @Deprecated
064  public static final String END = "hbase.simpletotalorder.end";
065
066  static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
067  static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
068
069  private Configuration c;
070  private byte [] startkey;
071  private byte [] endkey;
072  private byte [][] splits;
073  private int lastReduces = -1;
074
075  public static void setStartKey(Configuration conf, byte[] startKey) {
076    conf.set(START_BASE64, Bytes.toString(Base64.getEncoder().encode(startKey)));
077  }
078
079  public static void setEndKey(Configuration conf, byte[] endKey) {
080    conf.set(END_BASE64, Bytes.toString(Base64.getEncoder().encode(endKey)));
081  }
082
083  @SuppressWarnings("deprecation")
084  static byte[] getStartKey(Configuration conf) {
085    return getKeyFromConf(conf, START_BASE64, START);
086  }
087
088  @SuppressWarnings("deprecation")
089  static byte[] getEndKey(Configuration conf) {
090    return getKeyFromConf(conf, END_BASE64, END);
091  }
092
093  private static byte[] getKeyFromConf(Configuration conf,
094      String base64Key, String deprecatedKey) {
095    String encoded = conf.get(base64Key);
096    if (encoded != null) {
097      return Base64.getDecoder().decode(encoded);
098    }
099    String oldStyleVal = conf.get(deprecatedKey);
100    if (oldStyleVal == null) {
101      return null;
102    }
103    LOG.warn("Using deprecated configuration " + deprecatedKey +
104        " - please use static accessor methods instead.");
105    return Bytes.toBytesBinary(oldStyleVal);
106  }
107
108  @Override
109  public int getPartition(final ImmutableBytesWritable key, final VALUE value,
110      final int reduces) {
111    if (reduces == 1) return 0;
112    if (this.lastReduces != reduces) {
113      this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
114      for (int i = 0; i < splits.length; i++) {
115        LOG.info(Bytes.toStringBinary(splits[i]));
116      }
117      this.lastReduces = reduces;
118    }
119    int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
120      key.getLength());
121    // Below code is from hfile index search.
122    if (pos < 0) {
123      pos++;
124      pos *= -1;
125      if (pos == 0) {
126        // falls before the beginning of the file.
127        throw new RuntimeException("Key outside start/stop range: " +
128          key.toString());
129      }
130      pos--;
131    }
132    return pos;
133  }
134
135  @Override
136  public Configuration getConf() {
137    return this.c;
138  }
139
140  @Override
141  public void setConf(Configuration conf) {
142    this.c = conf;
143    this.startkey = getStartKey(conf);
144    this.endkey = getEndKey(conf);
145    if (startkey == null || endkey == null) {
146      throw new RuntimeException(this.getClass() + " not configured");
147    }
148    LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
149        ", endkey=" + Bytes.toStringBinary(endkey));
150    // Reset last reduces count on change of Start / End key
151    this.lastReduces = -1;
152  }
153}