View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.classification.InterfaceStability;
25  import org.apache.hadoop.conf.Configurable;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28  import org.apache.hadoop.hbase.util.Base64;
29  import org.apache.hadoop.hbase.util.Bytes;
30  import org.apache.hadoop.mapreduce.Partitioner;
31  
32  /**
33   * A partitioner that takes start and end keys and uses bigdecimal to figure
34   * which reduce a key belongs to.  Pass the start and end
35   * keys in the Configuration using <code>hbase.simpletotalorder.start</code>
36   * and <code>hbase.simpletotalorder.end</code>.  The end key needs to be
37   * exclusive; i.e. one larger than the biggest key in your key space.
38   * You may be surprised at how this class partitions the space; it may not
39   * align with preconceptions; e.g. a start key of zero and an end key of 100
40   * divided in ten will not make regions whose range is 0-10, 10-20, and so on.
41   * Make your own partitioner if you need the region spacing to come out a
42   * particular way.
43   * @param <VALUE>
44   * @see #START
45   * @see #END
46   */
47  @InterfaceAudience.Public
48  @InterfaceStability.Stable
49  public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
50  implements Configurable {
51    private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
52  
53    @Deprecated
54    public static final String START = "hbase.simpletotalorder.start";
55    @Deprecated
56    public static final String END = "hbase.simpletotalorder.end";
57    
58    static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
59    static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
60    
61    private Configuration c;
62    private byte [] startkey;
63    private byte [] endkey;
64    private byte [][] splits;
65    private int lastReduces = -1;
66  
67    public static void setStartKey(Configuration conf, byte[] startKey) {
68      conf.set(START_BASE64, Base64.encodeBytes(startKey));
69    }
70    
71    public static void setEndKey(Configuration conf, byte[] endKey) {
72      conf.set(END_BASE64, Base64.encodeBytes(endKey));
73    }
74    
75    @SuppressWarnings("deprecation")
76    static byte[] getStartKey(Configuration conf) {
77      return getKeyFromConf(conf, START_BASE64, START);
78    }
79    
80    @SuppressWarnings("deprecation")
81    static byte[] getEndKey(Configuration conf) {
82      return getKeyFromConf(conf, END_BASE64, END);
83    }
84    
85    private static byte[] getKeyFromConf(Configuration conf,
86        String base64Key, String deprecatedKey) {
87      String encoded = conf.get(base64Key);
88      if (encoded != null) {
89        return Base64.decode(encoded);
90      }
91      String oldStyleVal = conf.get(deprecatedKey);
92      if (oldStyleVal == null) {
93        return null;
94      }
95      LOG.warn("Using deprecated configuration " + deprecatedKey +
96          " - please use static accessor methods instead.");
97      return Bytes.toBytes(oldStyleVal);
98    }
99    
100   @Override
101   public int getPartition(final ImmutableBytesWritable key, final VALUE value,
102       final int reduces) {
103     if (reduces == 1) return 0;
104     if (this.lastReduces != reduces) {
105       this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
106       for (int i = 0; i < splits.length; i++) {
107         LOG.info(Bytes.toStringBinary(splits[i]));
108       }
109       this.lastReduces = reduces;
110     }
111     int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
112       key.getLength(), Bytes.BYTES_RAWCOMPARATOR);
113     // Below code is from hfile index search.
114     if (pos < 0) {
115       pos++;
116       pos *= -1;
117       if (pos == 0) {
118         // falls before the beginning of the file.
119         throw new RuntimeException("Key outside start/stop range: " +
120           key.toString());
121       }
122       pos--;
123     }
124     return pos;
125   }
126 
127   @Override
128   public Configuration getConf() {
129     return this.c;
130   }
131 
132   @Override
133   public void setConf(Configuration conf) {
134     this.c = conf;
135     this.startkey = getStartKey(conf);
136     this.endkey = getEndKey(conf);
137     if (startkey == null || endkey == null) {
138       throw new RuntimeException(this.getClass() + " not configured");
139     }
140     LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
141         ", endkey=" + Bytes.toStringBinary(endkey));
142     // Reset last reduces count on change of Start / End key
143     this.lastReduces = -1;
144   }
145 }