View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.classification.InterfaceStability;
25  import org.apache.hadoop.conf.Configurable;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28  import org.apache.hadoop.hbase.util.Base64;
29  import org.apache.hadoop.hbase.util.Bytes;
30  import org.apache.hadoop.mapreduce.Partitioner;
31  
32  /**
33   * A partitioner that takes start and end keys and uses bigdecimal to figure
34   * which reduce a key belongs to.  Pass the start and end
35   * keys in the Configuration using <code>hbase.simpletotalorder.start</code>
36   * and <code>hbase.simpletotalorder.end</code>.  The end key needs to be
37   * exclusive; i.e. one larger than the biggest key in your key space.
38   * You may be surprised at how this class partitions the space; it may not
39   * align with preconceptions; e.g. a start key of zero and an end key of 100
40   * divided in ten will not make regions whose range is 0-10, 10-20, and so on.
41   * Make your own partitioner if you need the region spacing to come out a
42   * particular way.
43   * @param <VALUE>
44   * @see #START
45   * @see #END
46   */
47  @InterfaceAudience.Public
48  @InterfaceStability.Stable
49  public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
50  implements Configurable {
51    private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
52  
53    @Deprecated
54    public static final String START = "hbase.simpletotalorder.start";
55    @Deprecated
56    public static final String END = "hbase.simpletotalorder.end";
57    
58    static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
59    static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
60    
61    private Configuration c;
62    private byte [] startkey;
63    private byte [] endkey;
64    private byte [][] splits;
65    private int lastReduces = -1;
66  
67    public static void setStartKey(Configuration conf, byte[] startKey) {
68      conf.set(START_BASE64, Base64.encodeBytes(startKey));
69    }
70    
71    public static void setEndKey(Configuration conf, byte[] endKey) {
72      conf.set(END_BASE64, Base64.encodeBytes(endKey));
73    }
74    
75    @SuppressWarnings("deprecation")
76    static byte[] getStartKey(Configuration conf) {
77      return getKeyFromConf(conf, START_BASE64, START);
78    }
79    
80    @SuppressWarnings("deprecation")
81    static byte[] getEndKey(Configuration conf) {
82      return getKeyFromConf(conf, END_BASE64, END);
83    }
84    
85    private static byte[] getKeyFromConf(Configuration conf,
86        String base64Key, String deprecatedKey) {
87      String encoded = conf.get(base64Key);
88      if (encoded != null) {
89        return Base64.decode(encoded);
90      }
91      String oldStyleVal = conf.get(deprecatedKey);
92      if (oldStyleVal == null) {
93        return null;
94      }
95      LOG.warn("Using deprecated configuration " + deprecatedKey +
96          " - please use static accessor methods instead.");
97      return Bytes.toBytes(oldStyleVal);
98    }
99    
100   @Override
101   public int getPartition(final ImmutableBytesWritable key, final VALUE value,
102       final int reduces) {
103     if (reduces == 1) return 0;
104     if (this.lastReduces != reduces) {
105       this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
106       for (int i = 0; i < splits.length; i++) {
107         LOG.info(Bytes.toStringBinary(splits[i]));
108       }
109     }
110     int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
111       key.getLength(), Bytes.BYTES_RAWCOMPARATOR);
112     // Below code is from hfile index search.
113     if (pos < 0) {
114       pos++;
115       pos *= -1;
116       if (pos == 0) {
117         // falls before the beginning of the file.
118         throw new RuntimeException("Key outside start/stop range: " +
119           key.toString());
120       }
121       pos--;
122     }
123     return pos;
124   }
125 
126   @Override
127   public Configuration getConf() {
128     return this.c;
129   }
130 
131   @Override
132   public void setConf(Configuration conf) {
133     this.c = conf;
134     this.startkey = getStartKey(conf);
135     this.endkey = getEndKey(conf);
136     if (startkey == null || endkey == null) {
137       throw new RuntimeException(this.getClass() + " not configured");
138     }
139     LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
140         ", endkey=" + Bytes.toStringBinary(endkey));
141   }
142 }