1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.classification.InterfaceStability;
25 import org.apache.hadoop.conf.Configurable;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28 import org.apache.hadoop.hbase.util.Base64;
29 import org.apache.hadoop.hbase.util.Bytes;
30 import org.apache.hadoop.mapreduce.Partitioner;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 @InterfaceAudience.Public
48 @InterfaceStability.Stable
49 public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
50 implements Configurable {
51 private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
52
53 @Deprecated
54 public static final String START = "hbase.simpletotalorder.start";
55 @Deprecated
56 public static final String END = "hbase.simpletotalorder.end";
57
58 static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
59 static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
60
61 private Configuration c;
62 private byte [] startkey;
63 private byte [] endkey;
64 private byte [][] splits;
65 private int lastReduces = -1;
66
67 public static void setStartKey(Configuration conf, byte[] startKey) {
68 conf.set(START_BASE64, Base64.encodeBytes(startKey));
69 }
70
71 public static void setEndKey(Configuration conf, byte[] endKey) {
72 conf.set(END_BASE64, Base64.encodeBytes(endKey));
73 }
74
75 @SuppressWarnings("deprecation")
76 static byte[] getStartKey(Configuration conf) {
77 return getKeyFromConf(conf, START_BASE64, START);
78 }
79
80 @SuppressWarnings("deprecation")
81 static byte[] getEndKey(Configuration conf) {
82 return getKeyFromConf(conf, END_BASE64, END);
83 }
84
85 private static byte[] getKeyFromConf(Configuration conf,
86 String base64Key, String deprecatedKey) {
87 String encoded = conf.get(base64Key);
88 if (encoded != null) {
89 return Base64.decode(encoded);
90 }
91 String oldStyleVal = conf.get(deprecatedKey);
92 if (oldStyleVal == null) {
93 return null;
94 }
95 LOG.warn("Using deprecated configuration " + deprecatedKey +
96 " - please use static accessor methods instead.");
97 return Bytes.toBytes(oldStyleVal);
98 }
99
100 @Override
101 public int getPartition(final ImmutableBytesWritable key, final VALUE value,
102 final int reduces) {
103 if (reduces == 1) return 0;
104 if (this.lastReduces != reduces) {
105 this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
106 for (int i = 0; i < splits.length; i++) {
107 LOG.info(Bytes.toStringBinary(splits[i]));
108 }
109 this.lastReduces = reduces;
110 }
111 int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
112 key.getLength(), Bytes.BYTES_RAWCOMPARATOR);
113
114 if (pos < 0) {
115 pos++;
116 pos *= -1;
117 if (pos == 0) {
118
119 throw new RuntimeException("Key outside start/stop range: " +
120 key.toString());
121 }
122 pos--;
123 }
124 return pos;
125 }
126
127 @Override
128 public Configuration getConf() {
129 return this.c;
130 }
131
132 @Override
133 public void setConf(Configuration conf) {
134 this.c = conf;
135 this.startkey = getStartKey(conf);
136 this.endkey = getEndKey(conf);
137 if (startkey == null || endkey == null) {
138 throw new RuntimeException(this.getClass() + " not configured");
139 }
140 LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
141 ", endkey=" + Bytes.toStringBinary(endkey));
142
143 this.lastReduces = -1;
144 }
145 }