1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.classification.InterfaceStability;
25 import org.apache.hadoop.conf.Configurable;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28 import org.apache.hadoop.hbase.util.Base64;
29 import org.apache.hadoop.hbase.util.Bytes;
30 import org.apache.hadoop.mapreduce.Partitioner;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 @InterfaceAudience.Public
48 @InterfaceStability.Stable
49 public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
50 implements Configurable {
51 private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
52
53 @Deprecated
54 public static final String START = "hbase.simpletotalorder.start";
55 @Deprecated
56 public static final String END = "hbase.simpletotalorder.end";
57
58 static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
59 static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
60
61 private Configuration c;
62 private byte [] startkey;
63 private byte [] endkey;
64 private byte [][] splits;
65 private int lastReduces = -1;
66
67 public static void setStartKey(Configuration conf, byte[] startKey) {
68 conf.set(START_BASE64, Base64.encodeBytes(startKey));
69 }
70
71 public static void setEndKey(Configuration conf, byte[] endKey) {
72 conf.set(END_BASE64, Base64.encodeBytes(endKey));
73 }
74
75 @SuppressWarnings("deprecation")
76 static byte[] getStartKey(Configuration conf) {
77 return getKeyFromConf(conf, START_BASE64, START);
78 }
79
80 @SuppressWarnings("deprecation")
81 static byte[] getEndKey(Configuration conf) {
82 return getKeyFromConf(conf, END_BASE64, END);
83 }
84
85 private static byte[] getKeyFromConf(Configuration conf,
86 String base64Key, String deprecatedKey) {
87 String encoded = conf.get(base64Key);
88 if (encoded != null) {
89 return Base64.decode(encoded);
90 }
91 String oldStyleVal = conf.get(deprecatedKey);
92 if (oldStyleVal == null) {
93 return null;
94 }
95 LOG.warn("Using deprecated configuration " + deprecatedKey +
96 " - please use static accessor methods instead.");
97 return Bytes.toBytes(oldStyleVal);
98 }
99
100 @Override
101 public int getPartition(final ImmutableBytesWritable key, final VALUE value,
102 final int reduces) {
103 if (reduces == 1) return 0;
104 if (this.lastReduces != reduces) {
105 this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
106 for (int i = 0; i < splits.length; i++) {
107 LOG.info(Bytes.toStringBinary(splits[i]));
108 }
109 }
110 int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
111 key.getLength(), Bytes.BYTES_RAWCOMPARATOR);
112
113 if (pos < 0) {
114 pos++;
115 pos *= -1;
116 if (pos == 0) {
117
118 throw new RuntimeException("Key outside start/stop range: " +
119 key.toString());
120 }
121 pos--;
122 }
123 return pos;
124 }
125
126 @Override
127 public Configuration getConf() {
128 return this.c;
129 }
130
131 @Override
132 public void setConf(Configuration conf) {
133 this.c = conf;
134 this.startkey = getStartKey(conf);
135 this.endkey = getEndKey(conf);
136 if (startkey == null || endkey == null) {
137 throw new RuntimeException(this.getClass() + " not configured");
138 }
139 LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
140 ", endkey=" + Bytes.toStringBinary(endkey));
141 }
142 }