001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.util.Base64;
021import org.apache.hadoop.conf.Configurable;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
024import org.apache.hadoop.hbase.util.Bytes;
025import org.apache.hadoop.mapreduce.Partitioner;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * A partitioner that takes start and end keys and uses bigdecimal to figure which reduce a key
032 * belongs to. Pass the start and end keys in the Configuration using
033 * <code>hbase.simpletotalorder.start</code> and <code>hbase.simpletotalorder.end</code>. The end
034 * key needs to be exclusive; i.e. one larger than the biggest key in your key space. You may be
035 * surprised at how this class partitions the space; it may not align with preconceptions; e.g. a
036 * start key of zero and an end key of 100 divided in ten will not make regions whose range is 0-10,
037 * 10-20, and so on. Make your own partitioner if you need the region spacing to come out a
038 * particular way.
039 * @param <VALUE>
040 * @see #START
041 * @see #END
042 */
043@InterfaceAudience.Public
044public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
045  implements Configurable {
046  private final static Logger LOG = LoggerFactory.getLogger(SimpleTotalOrderPartitioner.class);
047
048  /**
049   * @deprecated since 0.90.0
050   * @see <a href="https://issues.apache.org/jira/browse/HBASE-1923">HBASE-1923</a>
051   */
052  @Deprecated
053  public static final String START = "hbase.simpletotalorder.start";
054
055  /**
056   * @deprecated since 0.90.0
057   * @see <a href="https://issues.apache.org/jira/browse/HBASE-1923">HBASE-1923</a>
058   */
059  @Deprecated
060  public static final String END = "hbase.simpletotalorder.end";
061
062  static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
063  static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
064
065  private Configuration c;
066  private byte[] startkey;
067  private byte[] endkey;
068  private byte[][] splits;
069  private int lastReduces = -1;
070
071  public static void setStartKey(Configuration conf, byte[] startKey) {
072    conf.set(START_BASE64, Bytes.toString(Base64.getEncoder().encode(startKey)));
073  }
074
075  public static void setEndKey(Configuration conf, byte[] endKey) {
076    conf.set(END_BASE64, Bytes.toString(Base64.getEncoder().encode(endKey)));
077  }
078
079  @SuppressWarnings("deprecation")
080  static byte[] getStartKey(Configuration conf) {
081    return getKeyFromConf(conf, START_BASE64, START);
082  }
083
084  @SuppressWarnings("deprecation")
085  static byte[] getEndKey(Configuration conf) {
086    return getKeyFromConf(conf, END_BASE64, END);
087  }
088
089  private static byte[] getKeyFromConf(Configuration conf, String base64Key, String deprecatedKey) {
090    String encoded = conf.get(base64Key);
091    if (encoded != null) {
092      return Base64.getDecoder().decode(encoded);
093    }
094    String oldStyleVal = conf.get(deprecatedKey);
095    if (oldStyleVal == null) {
096      return null;
097    }
098    LOG.warn("Using deprecated configuration " + deprecatedKey
099      + " - please use static accessor methods instead.");
100    return Bytes.toBytesBinary(oldStyleVal);
101  }
102
103  @Override
104  public int getPartition(final ImmutableBytesWritable key, final VALUE value, final int reduces) {
105    if (reduces == 1) return 0;
106    if (this.lastReduces != reduces) {
107      this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
108      for (int i = 0; i < splits.length; i++) {
109        LOG.info(Bytes.toStringBinary(splits[i]));
110      }
111      this.lastReduces = reduces;
112    }
113    int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(), key.getLength());
114    // Below code is from hfile index search.
115    if (pos < 0) {
116      pos++;
117      pos *= -1;
118      if (pos == 0) {
119        // falls before the beginning of the file.
120        throw new RuntimeException("Key outside start/stop range: " + key.toString());
121      }
122      pos--;
123    }
124    return pos;
125  }
126
127  @Override
128  public Configuration getConf() {
129    return this.c;
130  }
131
132  @Override
133  public void setConf(Configuration conf) {
134    this.c = conf;
135    this.startkey = getStartKey(conf);
136    this.endkey = getEndKey(conf);
137    if (startkey == null || endkey == null) {
138      throw new RuntimeException(this.getClass() + " not configured");
139    }
140    LOG.info(
141      "startkey=" + Bytes.toStringBinary(startkey) + ", endkey=" + Bytes.toStringBinary(endkey));
142    // Reset last reduces count on change of Start / End key
143    this.lastReduces = -1;
144  }
145}