1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.NavigableSet;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.Coprocessor;
31 import org.apache.hadoop.hbase.CoprocessorEnvironment;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.client.Scan;
34 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
38 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
39 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
40 import org.apache.hadoop.hbase.regionserver.InternalScanner;
41
42 import com.google.protobuf.ByteString;
43 import com.google.protobuf.Message;
44 import com.google.protobuf.RpcCallback;
45 import com.google.protobuf.RpcController;
46 import com.google.protobuf.Service;
47
48
49
50
51
52
53
54
55
56
57
58
59
60 @InterfaceAudience.Private
61 public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
62 extends AggregateService implements CoprocessorService, Coprocessor {
63 protected static final Log log = LogFactory.getLog(AggregateImplementation.class);
64 private RegionCoprocessorEnvironment env;
65
66
67
68
69
70
71
72
73 @Override
74 public void getMax(RpcController controller, AggregateRequest request,
75 RpcCallback<AggregateResponse> done) {
76 InternalScanner scanner = null;
77 AggregateResponse response = null;
78 T max = null;
79 try {
80 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
81 T temp;
82 Scan scan = ProtobufUtil.toScan(request.getScan());
83 scanner = env.getRegion().getScanner(scan);
84 List<Cell> results = new ArrayList<Cell>();
85 byte[] colFamily = scan.getFamilies()[0];
86 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
87 byte[] qualifier = null;
88 if (qualifiers != null && !qualifiers.isEmpty()) {
89 qualifier = qualifiers.pollFirst();
90 }
91
92 boolean hasMoreRows = false;
93 do {
94 hasMoreRows = scanner.next(results);
95 int listSize = results.size();
96 for (int i = 0; i < listSize; i++) {
97 temp = ci.getValue(colFamily, qualifier, results.get(i));
98 max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
99 }
100 results.clear();
101 } while (hasMoreRows);
102 if (max != null) {
103 AggregateResponse.Builder builder = AggregateResponse.newBuilder();
104 builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
105 response = builder.build();
106 }
107 } catch (IOException e) {
108 ResponseConverter.setControllerException(controller, e);
109 } finally {
110 if (scanner != null) {
111 try {
112 scanner.close();
113 } catch (IOException ignored) {}
114 }
115 }
116 log.info("Maximum from this region is "
117 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + max);
118 done.run(response);
119 }
120
121
122
123
124
125
126
127
128 @Override
129 public void getMin(RpcController controller, AggregateRequest request,
130 RpcCallback<AggregateResponse> done) {
131 AggregateResponse response = null;
132 InternalScanner scanner = null;
133 T min = null;
134 try {
135 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
136 T temp;
137 Scan scan = ProtobufUtil.toScan(request.getScan());
138 scanner = env.getRegion().getScanner(scan);
139 List<Cell> results = new ArrayList<Cell>();
140 byte[] colFamily = scan.getFamilies()[0];
141 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
142 byte[] qualifier = null;
143 if (qualifiers != null && !qualifiers.isEmpty()) {
144 qualifier = qualifiers.pollFirst();
145 }
146 boolean hasMoreRows = false;
147 do {
148 hasMoreRows = scanner.next(results);
149 int listSize = results.size();
150 for (int i = 0; i < listSize; i++) {
151 temp = ci.getValue(colFamily, qualifier, results.get(i));
152 min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
153 }
154 results.clear();
155 } while (hasMoreRows);
156 if (min != null) {
157 response = AggregateResponse.newBuilder().addFirstPart(
158 ci.getProtoForCellType(min).toByteString()).build();
159 }
160 } catch (IOException e) {
161 ResponseConverter.setControllerException(controller, e);
162 } finally {
163 if (scanner != null) {
164 try {
165 scanner.close();
166 } catch (IOException ignored) {}
167 }
168 }
169 log.info("Minimum from this region is "
170 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + min);
171 done.run(response);
172 }
173
174
175
176
177
178
179
180
181 @Override
182 public void getSum(RpcController controller, AggregateRequest request,
183 RpcCallback<AggregateResponse> done) {
184 AggregateResponse response = null;
185 InternalScanner scanner = null;
186 long sum = 0l;
187 try {
188 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
189 S sumVal = null;
190 T temp;
191 Scan scan = ProtobufUtil.toScan(request.getScan());
192 scanner = env.getRegion().getScanner(scan);
193 byte[] colFamily = scan.getFamilies()[0];
194 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
195 byte[] qualifier = null;
196 if (qualifiers != null && !qualifiers.isEmpty()) {
197 qualifier = qualifiers.pollFirst();
198 }
199 List<Cell> results = new ArrayList<Cell>();
200 boolean hasMoreRows = false;
201 do {
202 hasMoreRows = scanner.next(results);
203 int listSize = results.size();
204 for (int i = 0; i < listSize; i++) {
205 temp = ci.getValue(colFamily, qualifier, results.get(i));
206 if (temp != null)
207 sumVal = ci.add(sumVal, ci.castToReturnType(temp));
208 }
209 results.clear();
210 } while (hasMoreRows);
211 if (sumVal != null) {
212 response = AggregateResponse.newBuilder().addFirstPart(
213 ci.getProtoForPromotedType(sumVal).toByteString()).build();
214 }
215 } catch (IOException e) {
216 ResponseConverter.setControllerException(controller, e);
217 } finally {
218 if (scanner != null) {
219 try {
220 scanner.close();
221 } catch (IOException ignored) {}
222 }
223 }
224 log.debug("Sum from this region is "
225 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + sum);
226 done.run(response);
227 }
228
229
230
231
232
233 @Override
234 public void getRowNum(RpcController controller, AggregateRequest request,
235 RpcCallback<AggregateResponse> done) {
236 AggregateResponse response = null;
237 long counter = 0l;
238 List<Cell> results = new ArrayList<Cell>();
239 InternalScanner scanner = null;
240 try {
241 Scan scan = ProtobufUtil.toScan(request.getScan());
242 byte[][] colFamilies = scan.getFamilies();
243 byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
244 NavigableSet<byte[]> qualifiers = colFamilies != null ?
245 scan.getFamilyMap().get(colFamily) : null;
246 byte[] qualifier = null;
247 if (qualifiers != null && !qualifiers.isEmpty()) {
248 qualifier = qualifiers.pollFirst();
249 }
250 if (scan.getFilter() == null && qualifier == null)
251 scan.setFilter(new FirstKeyOnlyFilter());
252 scanner = env.getRegion().getScanner(scan);
253 boolean hasMoreRows = false;
254 do {
255 hasMoreRows = scanner.next(results);
256 if (results.size() > 0) {
257 counter++;
258 }
259 results.clear();
260 } while (hasMoreRows);
261 ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
262 bb.rewind();
263 response = AggregateResponse.newBuilder().addFirstPart(
264 ByteString.copyFrom(bb)).build();
265 } catch (IOException e) {
266 ResponseConverter.setControllerException(controller, e);
267 } finally {
268 if (scanner != null) {
269 try {
270 scanner.close();
271 } catch (IOException ignored) {}
272 }
273 }
274 log.info("Row counter from this region is "
275 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter);
276 done.run(response);
277 }
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292 @Override
293 public void getAvg(RpcController controller, AggregateRequest request,
294 RpcCallback<AggregateResponse> done) {
295 AggregateResponse response = null;
296 InternalScanner scanner = null;
297 try {
298 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
299 S sumVal = null;
300 Long rowCountVal = 0l;
301 Scan scan = ProtobufUtil.toScan(request.getScan());
302 scanner = env.getRegion().getScanner(scan);
303 byte[] colFamily = scan.getFamilies()[0];
304 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
305 byte[] qualifier = null;
306 if (qualifiers != null && !qualifiers.isEmpty()) {
307 qualifier = qualifiers.pollFirst();
308 }
309 List<Cell> results = new ArrayList<Cell>();
310 boolean hasMoreRows = false;
311
312 do {
313 results.clear();
314 hasMoreRows = scanner.next(results);
315 int listSize = results.size();
316 for (int i = 0; i < listSize; i++) {
317 sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
318 qualifier, results.get(i))));
319 }
320 rowCountVal++;
321 } while (hasMoreRows);
322 if (sumVal != null) {
323 ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
324 AggregateResponse.Builder pair = AggregateResponse.newBuilder();
325 pair.addFirstPart(first);
326 ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
327 bb.rewind();
328 pair.setSecondPart(ByteString.copyFrom(bb));
329 response = pair.build();
330 }
331 } catch (IOException e) {
332 ResponseConverter.setControllerException(controller, e);
333 } finally {
334 if (scanner != null) {
335 try {
336 scanner.close();
337 } catch (IOException ignored) {}
338 }
339 }
340 done.run(response);
341 }
342
343
344
345
346
347
348
349
350
351
352 @Override
353 public void getStd(RpcController controller, AggregateRequest request,
354 RpcCallback<AggregateResponse> done) {
355 InternalScanner scanner = null;
356 AggregateResponse response = null;
357 try {
358 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
359 S sumVal = null, sumSqVal = null, tempVal = null;
360 long rowCountVal = 0l;
361 Scan scan = ProtobufUtil.toScan(request.getScan());
362 scanner = env.getRegion().getScanner(scan);
363 byte[] colFamily = scan.getFamilies()[0];
364 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
365 byte[] qualifier = null;
366 if (qualifiers != null && !qualifiers.isEmpty()) {
367 qualifier = qualifiers.pollFirst();
368 }
369 List<Cell> results = new ArrayList<Cell>();
370
371 boolean hasMoreRows = false;
372
373 do {
374 tempVal = null;
375 hasMoreRows = scanner.next(results);
376 int listSize = results.size();
377 for (int i = 0; i < listSize; i++) {
378 tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
379 qualifier, results.get(i))));
380 }
381 results.clear();
382 sumVal = ci.add(sumVal, tempVal);
383 sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
384 rowCountVal++;
385 } while (hasMoreRows);
386 if (sumVal != null) {
387 ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
388 ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
389 AggregateResponse.Builder pair = AggregateResponse.newBuilder();
390 pair.addFirstPart(first_sumVal);
391 pair.addFirstPart(first_sumSqVal);
392 ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
393 bb.rewind();
394 pair.setSecondPart(ByteString.copyFrom(bb));
395 response = pair.build();
396 }
397 } catch (IOException e) {
398 ResponseConverter.setControllerException(controller, e);
399 } finally {
400 if (scanner != null) {
401 try {
402 scanner.close();
403 } catch (IOException ignored) {}
404 }
405 }
406 done.run(response);
407 }
408
409
410
411
412
413
414
415
416
417 @Override
418 public void getMedian(RpcController controller, AggregateRequest request,
419 RpcCallback<AggregateResponse> done) {
420 AggregateResponse response = null;
421 InternalScanner scanner = null;
422 try {
423 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
424 S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
425 Scan scan = ProtobufUtil.toScan(request.getScan());
426 scanner = env.getRegion().getScanner(scan);
427 byte[] colFamily = scan.getFamilies()[0];
428 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
429 byte[] valQualifier = null, weightQualifier = null;
430 if (qualifiers != null && !qualifiers.isEmpty()) {
431 valQualifier = qualifiers.pollFirst();
432
433 weightQualifier = qualifiers.pollLast();
434 }
435 List<Cell> results = new ArrayList<Cell>();
436
437 boolean hasMoreRows = false;
438
439 do {
440 tempVal = null;
441 tempWeight = null;
442 hasMoreRows = scanner.next(results);
443 int listSize = results.size();
444 for (int i = 0; i < listSize; i++) {
445 Cell kv = results.get(i);
446 tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
447 valQualifier, kv)));
448 if (weightQualifier != null) {
449 tempWeight = ci.add(tempWeight,
450 ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
451 }
452 }
453 results.clear();
454 sumVal = ci.add(sumVal, tempVal);
455 sumWeights = ci.add(sumWeights, tempWeight);
456 } while (hasMoreRows);
457 ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
458 S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
459 ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
460 AggregateResponse.Builder pair = AggregateResponse.newBuilder();
461 pair.addFirstPart(first_sumVal);
462 pair.addFirstPart(first_sumWeights);
463 response = pair.build();
464 } catch (IOException e) {
465 ResponseConverter.setControllerException(controller, e);
466 } finally {
467 if (scanner != null) {
468 try {
469 scanner.close();
470 } catch (IOException ignored) {}
471 }
472 }
473 done.run(response);
474 }
475
476 @SuppressWarnings("unchecked")
477 ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
478 AggregateRequest request) throws IOException {
479 String className = request.getInterpreterClassName();
480 Class<?> cls;
481 try {
482 cls = Class.forName(className);
483 ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
484 if (request.hasInterpreterSpecificBytes()) {
485 ByteString b = request.getInterpreterSpecificBytes();
486 P initMsg = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 2, b);
487 ci.initialize(initMsg);
488 }
489 return ci;
490 } catch (ClassNotFoundException e) {
491 throw new IOException(e);
492 } catch (InstantiationException e) {
493 throw new IOException(e);
494 } catch (IllegalAccessException e) {
495 throw new IOException(e);
496 }
497 }
498
499 @Override
500 public Service getService() {
501 return this;
502 }
503
504
505
506
507
508
509
510
511
512
513
514 @Override
515 public void start(CoprocessorEnvironment env) throws IOException {
516 if (env instanceof RegionCoprocessorEnvironment) {
517 this.env = (RegionCoprocessorEnvironment)env;
518 } else {
519 throw new CoprocessorException("Must be loaded on a table region!");
520 }
521 }
522
523 @Override
524 public void stop(CoprocessorEnvironment env) throws IOException {
525
526 }
527
528 }