View Javadoc

1   /**
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configurable;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
27  import org.apache.hadoop.hbase.util.Base64;
28  import org.apache.hadoop.hbase.util.Bytes;
29  import org.apache.hadoop.mapreduce.Partitioner;
30  
31  /**
32   * A partitioner that takes start and end keys and uses bigdecimal to figure
33   * which reduce a key belongs to.  Pass the start and end
34   * keys in the Configuration using <code>hbase.simpletotalorder.start</code>
35   * and <code>hbase.simpletotalorder.end</code>.  The end key needs to be
36   * exclusive; i.e. one larger than the biggest key in your key space.
37   * You may be surprised at how this class partitions the space; it may not
38   * align with preconceptions; e.g. a start key of zero and an end key of 100
39   * divided in ten will not make regions whose range is 0-10, 10-20, and so on.
40   * Make your own partitioner if you need the region spacing to come out a
41   * particular way.
42   * @param <VALUE>
43   * @see #START
44   * @see #END
45   */
46  public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
47  implements Configurable {
48    private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
49  
50    @Deprecated
51    public static final String START = "hbase.simpletotalorder.start";
52    @Deprecated
53    public static final String END = "hbase.simpletotalorder.end";
54    
55    static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
56    static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
57    
58    private Configuration c;
59    private byte [] startkey;
60    private byte [] endkey;
61    private byte [][] splits;
62    private int lastReduces = -1;
63  
64    public static void setStartKey(Configuration conf, byte[] startKey) {
65      conf.set(START_BASE64, Base64.encodeBytes(startKey));
66    }
67    
68    public static void setEndKey(Configuration conf, byte[] endKey) {
69      conf.set(END_BASE64, Base64.encodeBytes(endKey));
70    }
71    
72    @SuppressWarnings("deprecation")
73    static byte[] getStartKey(Configuration conf) {
74      return getKeyFromConf(conf, START_BASE64, START);
75    }
76    
77    @SuppressWarnings("deprecation")
78    static byte[] getEndKey(Configuration conf) {
79      return getKeyFromConf(conf, END_BASE64, END);
80    }
81    
82    private static byte[] getKeyFromConf(Configuration conf,
83        String base64Key, String deprecatedKey) {
84      String encoded = conf.get(base64Key);
85      if (encoded != null) {
86        return Base64.decode(encoded);
87      }
88      String oldStyleVal = conf.get(deprecatedKey);
89      if (oldStyleVal == null) {
90        return null;
91      }
92      LOG.warn("Using deprecated configuration " + deprecatedKey +
93          " - please use static accessor methods instead.");
94      return Bytes.toBytes(oldStyleVal);
95    }
96    
97    @Override
98    public int getPartition(final ImmutableBytesWritable key, final VALUE value,
99        final int reduces) {
100     if (reduces == 1) return 0;
101     if (this.lastReduces != reduces) {
102       this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
103       for (int i = 0; i < splits.length; i++) {
104         LOG.info(Bytes.toStringBinary(splits[i]));
105       }
106     }
107     int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
108       key.getLength(), Bytes.BYTES_RAWCOMPARATOR);
109     // Below code is from hfile index search.
110     if (pos < 0) {
111       pos++;
112       pos *= -1;
113       if (pos == 0) {
114         // falls before the beginning of the file.
115         throw new RuntimeException("Key outside start/stop range: " +
116           key.toString());
117       }
118       pos--;
119     }
120     return pos;
121   }
122 
123   @Override
124   public Configuration getConf() {
125     return this.c;
126   }
127 
128   @Override
129   public void setConf(Configuration conf) {
130     this.c = conf;
131     this.startkey = getStartKey(conf);
132     this.endkey = getEndKey(conf);
133     if (startkey == null || endkey == null) {
134       throw new RuntimeException(this.getClass() + " not configured");
135     }
136     LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
137         ", endkey=" + Bytes.toStringBinary(endkey));
138   }
139 }