1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client.coprocessor;
21
22 import java.io.Closeable;
23 import java.io.IOException;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NavigableMap;
29 import java.util.NavigableSet;
30 import java.util.TreeMap;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.client.Connection;
42 import org.apache.hadoop.hbase.client.ConnectionFactory;
43 import org.apache.hadoop.hbase.client.Result;
44 import org.apache.hadoop.hbase.client.ResultScanner;
45 import org.apache.hadoop.hbase.client.Scan;
46 import org.apache.hadoop.hbase.client.Table;
47 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
48 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
49 import org.apache.hadoop.hbase.ipc.ServerRpcController;
50 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
51 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
52 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
53 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
54 import org.apache.hadoop.hbase.util.Bytes;
55 import org.apache.hadoop.hbase.util.Pair;
56
57 import com.google.protobuf.ByteString;
58 import com.google.protobuf.Message;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 @InterfaceAudience.Private
81 public class AggregationClient implements Closeable {
82
83 private static final Log log = LogFactory.getLog(AggregationClient.class);
84 private final Connection connection;
85
86
87
88
89
90 public AggregationClient(Configuration cfg) {
91 try {
92
93 this.connection = ConnectionFactory.createConnection(cfg);
94 } catch (IOException e) {
95 throw new RuntimeException(e);
96 }
97 }
98
99 @Override
100 public void close() throws IOException {
101 if (this.connection != null && !this.connection.isClosed()) {
102 this.connection.close();
103 }
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117
118 public <R, S, P extends Message, Q extends Message, T extends Message> R max(
119 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
120 throws Throwable {
121 try (Table table = connection.getTable(tableName)) {
122 return max(table, ci, scan);
123 }
124 }
125
126
127
128
129
130
131
132
133
134
135
136
137
138 public <R, S, P extends Message, Q extends Message, T extends Message>
139 R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
140 final Scan scan) throws Throwable {
141 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
142 class MaxCallBack implements Batch.Callback<R> {
143 R max = null;
144
145 R getMax() {
146 return max;
147 }
148
149 @Override
150 public synchronized void update(byte[] region, byte[] row, R result) {
151 max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
152 }
153 }
154 MaxCallBack aMaxCallBack = new MaxCallBack();
155 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
156 new Batch.Call<AggregateService, R>() {
157 @Override
158 public R call(AggregateService instance) throws IOException {
159 ServerRpcController controller = new ServerRpcController();
160 BlockingRpcCallback<AggregateResponse> rpcCallback =
161 new BlockingRpcCallback<AggregateResponse>();
162 instance.getMax(controller, requestArg, rpcCallback);
163 AggregateResponse response = rpcCallback.get();
164 if (controller.failedOnException()) {
165 throw controller.getFailedOn();
166 }
167 if (response.getFirstPartCount() > 0) {
168 ByteString b = response.getFirstPart(0);
169 Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
170 return ci.getCellValueFromProto(q);
171 }
172 return null;
173 }
174 }, aMaxCallBack);
175 return aMaxCallBack.getMax();
176 }
177
178
179
180
181
182 private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
183 if (scan == null
184 || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
185 .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
186 || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) &&
187 !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
188 throw new IOException(
189 "Agg client Exception: Startrow should be smaller than Stoprow");
190 } else if (!canFamilyBeAbsent) {
191 if (scan.getFamilyMap().size() != 1) {
192 throw new IOException("There must be only one family.");
193 }
194 }
195 }
196
197
198
199
200
201
202
203
204
205
206
207 public <R, S, P extends Message, Q extends Message, T extends Message> R min(
208 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
209 throws Throwable {
210 try (Table table = connection.getTable(tableName)) {
211 return min(table, ci, scan);
212 }
213 }
214
215
216
217
218
219
220
221
222
223
224
225 public <R, S, P extends Message, Q extends Message, T extends Message>
226 R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
227 final Scan scan) throws Throwable {
228 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
229 class MinCallBack implements Batch.Callback<R> {
230
231 private R min = null;
232
233 public R getMinimum() {
234 return min;
235 }
236
237 @Override
238 public synchronized void update(byte[] region, byte[] row, R result) {
239 min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
240 }
241 }
242 MinCallBack minCallBack = new MinCallBack();
243 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
244 new Batch.Call<AggregateService, R>() {
245
246 @Override
247 public R call(AggregateService instance) throws IOException {
248 ServerRpcController controller = new ServerRpcController();
249 BlockingRpcCallback<AggregateResponse> rpcCallback =
250 new BlockingRpcCallback<AggregateResponse>();
251 instance.getMin(controller, requestArg, rpcCallback);
252 AggregateResponse response = rpcCallback.get();
253 if (controller.failedOnException()) {
254 throw controller.getFailedOn();
255 }
256 if (response.getFirstPartCount() > 0) {
257 ByteString b = response.getFirstPart(0);
258 Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
259 return ci.getCellValueFromProto(q);
260 }
261 return null;
262 }
263 }, minCallBack);
264 log.debug("Min fom all regions is: " + minCallBack.getMinimum());
265 return minCallBack.getMinimum();
266 }
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281 public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
282 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
283 throws Throwable {
284 try (Table table = connection.getTable(tableName)) {
285 return rowCount(table, ci, scan);
286 }
287 }
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302 public <R, S, P extends Message, Q extends Message, T extends Message>
303 long rowCount(final Table table,
304 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
305 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
306 class RowNumCallback implements Batch.Callback<Long> {
307 private final AtomicLong rowCountL = new AtomicLong(0);
308
309 public long getRowNumCount() {
310 return rowCountL.get();
311 }
312
313 @Override
314 public void update(byte[] region, byte[] row, Long result) {
315 rowCountL.addAndGet(result.longValue());
316 }
317 }
318 RowNumCallback rowNum = new RowNumCallback();
319 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
320 new Batch.Call<AggregateService, Long>() {
321 @Override
322 public Long call(AggregateService instance) throws IOException {
323 ServerRpcController controller = new ServerRpcController();
324 BlockingRpcCallback<AggregateResponse> rpcCallback =
325 new BlockingRpcCallback<AggregateResponse>();
326 instance.getRowNum(controller, requestArg, rpcCallback);
327 AggregateResponse response = rpcCallback.get();
328 if (controller.failedOnException()) {
329 throw controller.getFailedOn();
330 }
331 byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
332 ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
333 bb.rewind();
334 return bb.getLong();
335 }
336 }, rowNum);
337 return rowNum.getRowNumCount();
338 }
339
340
341
342
343
344
345
346
347
348
349 public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
350 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
351 throws Throwable {
352 try (Table table = connection.getTable(tableName)) {
353 return sum(table, ci, scan);
354 }
355 }
356
357
358
359
360
361
362
363
364
365
366 public <R, S, P extends Message, Q extends Message, T extends Message>
367 S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
368 final Scan scan) throws Throwable {
369 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
370
371 class SumCallBack implements Batch.Callback<S> {
372 S sumVal = null;
373
374 public S getSumResult() {
375 return sumVal;
376 }
377
378 @Override
379 public synchronized void update(byte[] region, byte[] row, S result) {
380 sumVal = ci.add(sumVal, result);
381 }
382 }
383 SumCallBack sumCallBack = new SumCallBack();
384 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
385 new Batch.Call<AggregateService, S>() {
386 @Override
387 public S call(AggregateService instance) throws IOException {
388 ServerRpcController controller = new ServerRpcController();
389 BlockingRpcCallback<AggregateResponse> rpcCallback =
390 new BlockingRpcCallback<AggregateResponse>();
391 instance.getSum(controller, requestArg, rpcCallback);
392 AggregateResponse response = rpcCallback.get();
393 if (controller.failedOnException()) {
394 throw controller.getFailedOn();
395 }
396 if (response.getFirstPartCount() == 0) {
397 return null;
398 }
399 ByteString b = response.getFirstPart(0);
400 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
401 S s = ci.getPromotedValueFromProto(t);
402 return s;
403 }
404 }, sumCallBack);
405 return sumCallBack.getSumResult();
406 }
407
408
409
410
411
412
413
414
415
416 private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
417 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
418 throws Throwable {
419 try (Table table = connection.getTable(tableName)) {
420 return getAvgArgs(table, ci, scan);
421 }
422 }
423
424
425
426
427
428
429
430
431
432 private <R, S, P extends Message, Q extends Message, T extends Message>
433 Pair<S, Long> getAvgArgs(final Table table,
434 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
435 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
436 class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
437 S sum = null;
438 Long rowCount = 0l;
439
440 public synchronized Pair<S, Long> getAvgArgs() {
441 return new Pair<S, Long>(sum, rowCount);
442 }
443
444 @Override
445 public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
446 sum = ci.add(sum, result.getFirst());
447 rowCount += result.getSecond();
448 }
449 }
450 AvgCallBack avgCallBack = new AvgCallBack();
451 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
452 new Batch.Call<AggregateService, Pair<S, Long>>() {
453 @Override
454 public Pair<S, Long> call(AggregateService instance) throws IOException {
455 ServerRpcController controller = new ServerRpcController();
456 BlockingRpcCallback<AggregateResponse> rpcCallback =
457 new BlockingRpcCallback<AggregateResponse>();
458 instance.getAvg(controller, requestArg, rpcCallback);
459 AggregateResponse response = rpcCallback.get();
460 if (controller.failedOnException()) {
461 throw controller.getFailedOn();
462 }
463 Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
464 if (response.getFirstPartCount() == 0) {
465 return pair;
466 }
467 ByteString b = response.getFirstPart(0);
468 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
469 S s = ci.getPromotedValueFromProto(t);
470 pair.setFirst(s);
471 ByteBuffer bb = ByteBuffer.allocate(8).put(
472 getBytesFromResponse(response.getSecondPart()));
473 bb.rewind();
474 pair.setSecond(bb.getLong());
475 return pair;
476 }
477 }, avgCallBack);
478 return avgCallBack.getAvgArgs();
479 }
480
481
482
483
484
485
486
487
488
489
490
491
492
493 public <R, S, P extends Message, Q extends Message, T extends Message>
494 double avg(final TableName tableName,
495 final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
496 Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
497 return ci.divideForAvg(p.getFirst(), p.getSecond());
498 }
499
500
501
502
503
504
505
506
507
508
509
510
511
512 public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
513 final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
514 Pair<S, Long> p = getAvgArgs(table, ci, scan);
515 return ci.divideForAvg(p.getFirst(), p.getSecond());
516 }
517
518
519
520
521
522
523
524
525
526
527
528
529 private <R, S, P extends Message, Q extends Message, T extends Message>
530 Pair<List<S>, Long> getStdArgs(final Table table,
531 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
532 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
533 class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
534 long rowCountVal = 0l;
535 S sumVal = null, sumSqVal = null;
536
537 public synchronized Pair<List<S>, Long> getStdParams() {
538 List<S> l = new ArrayList<S>();
539 l.add(sumVal);
540 l.add(sumSqVal);
541 Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
542 return p;
543 }
544
545 @Override
546 public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
547 if (result.getFirst().size() > 0) {
548 sumVal = ci.add(sumVal, result.getFirst().get(0));
549 sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
550 rowCountVal += result.getSecond();
551 }
552 }
553 }
554 StdCallback stdCallback = new StdCallback();
555 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
556 new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
557 @Override
558 public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
559 ServerRpcController controller = new ServerRpcController();
560 BlockingRpcCallback<AggregateResponse> rpcCallback =
561 new BlockingRpcCallback<AggregateResponse>();
562 instance.getStd(controller, requestArg, rpcCallback);
563 AggregateResponse response = rpcCallback.get();
564 if (controller.failedOnException()) {
565 throw controller.getFailedOn();
566 }
567 Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
568 if (response.getFirstPartCount() == 0) {
569 return pair;
570 }
571 List<S> list = new ArrayList<S>();
572 for (int i = 0; i < response.getFirstPartCount(); i++) {
573 ByteString b = response.getFirstPart(i);
574 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
575 S s = ci.getPromotedValueFromProto(t);
576 list.add(s);
577 }
578 pair.setFirst(list);
579 ByteBuffer bb = ByteBuffer.allocate(8).put(
580 getBytesFromResponse(response.getSecondPart()));
581 bb.rewind();
582 pair.setSecond(bb.getLong());
583 return pair;
584 }
585 }, stdCallback);
586 return stdCallback.getStdParams();
587 }
588
589
590
591
592
593
594
595
596
597
598
599
600
601 public <R, S, P extends Message, Q extends Message, T extends Message>
602 double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
603 Scan scan) throws Throwable {
604 try (Table table = connection.getTable(tableName)) {
605 return std(table, ci, scan);
606 }
607 }
608
609
610
611
612
613
614
615
616
617
618
619
620
621 public <R, S, P extends Message, Q extends Message, T extends Message> double std(
622 final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
623 Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
624 double res = 0d;
625 double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
626 double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
627 res = avgOfSumSq - (avg) * (avg);
628 res = Math.pow(res, 0.5);
629 return res;
630 }
631
632
633
634
635
636
637
638
639
640
641
642
643
644 private <R, S, P extends Message, Q extends Message, T extends Message>
645 Pair<NavigableMap<byte[], List<S>>, List<S>>
646 getMedianArgs(final Table table,
647 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
648 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
649 final NavigableMap<byte[], List<S>> map =
650 new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
651 class StdCallback implements Batch.Callback<List<S>> {
652 S sumVal = null, sumWeights = null;
653
654 public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
655 List<S> l = new ArrayList<S>();
656 l.add(sumVal);
657 l.add(sumWeights);
658 Pair<NavigableMap<byte[], List<S>>, List<S>> p =
659 new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
660 return p;
661 }
662
663 @Override
664 public synchronized void update(byte[] region, byte[] row, List<S> result) {
665 map.put(row, result);
666 sumVal = ci.add(sumVal, result.get(0));
667 sumWeights = ci.add(sumWeights, result.get(1));
668 }
669 }
670 StdCallback stdCallback = new StdCallback();
671 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
672 new Batch.Call<AggregateService, List<S>>() {
673 @Override
674 public List<S> call(AggregateService instance) throws IOException {
675 ServerRpcController controller = new ServerRpcController();
676 BlockingRpcCallback<AggregateResponse> rpcCallback =
677 new BlockingRpcCallback<AggregateResponse>();
678 instance.getMedian(controller, requestArg, rpcCallback);
679 AggregateResponse response = rpcCallback.get();
680 if (controller.failedOnException()) {
681 throw controller.getFailedOn();
682 }
683
684 List<S> list = new ArrayList<S>();
685 for (int i = 0; i < response.getFirstPartCount(); i++) {
686 ByteString b = response.getFirstPart(i);
687 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
688 S s = ci.getPromotedValueFromProto(t);
689 list.add(s);
690 }
691 return list;
692 }
693
694 }, stdCallback);
695 return stdCallback.getMedianParams();
696 }
697
698
699
700
701
702
703
704
705
706
707
708 public <R, S, P extends Message, Q extends Message, T extends Message>
709 R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
710 Scan scan) throws Throwable {
711 try (Table table = connection.getTable(tableName)) {
712 return median(table, ci, scan);
713 }
714 }
715
716
717
718
719
720
721
722
723
724
725
726 public <R, S, P extends Message, Q extends Message, T extends Message>
727 R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci,
728 Scan scan) throws Throwable {
729 Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
730 byte[] startRow = null;
731 byte[] colFamily = scan.getFamilies()[0];
732 NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
733 NavigableMap<byte[], List<S>> map = p.getFirst();
734 S sumVal = p.getSecond().get(0);
735 S sumWeights = p.getSecond().get(1);
736 double halfSumVal = ci.divideForAvg(sumVal, 2L);
737 double movingSumVal = 0;
738 boolean weighted = false;
739 if (quals.size() > 1) {
740 weighted = true;
741 halfSumVal = ci.divideForAvg(sumWeights, 2L);
742 }
743
744 for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
745 S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
746 double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
747 if (newSumVal > halfSumVal) break;
748 movingSumVal = newSumVal;
749 startRow = entry.getKey();
750 }
751
752 Scan scan2 = new Scan(scan);
753
754 if (startRow != null) scan2.setStartRow(startRow);
755 ResultScanner scanner = null;
756 try {
757 int cacheSize = scan2.getCaching();
758 if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
759 scan2.setCacheBlocks(true);
760 cacheSize = 5;
761 scan2.setCaching(cacheSize);
762 }
763 scanner = table.getScanner(scan2);
764 Result[] results = null;
765 byte[] qualifier = quals.pollFirst();
766
767 byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
768 R value = null;
769 do {
770 results = scanner.next(cacheSize);
771 if (results != null && results.length > 0) {
772 for (int i = 0; i < results.length; i++) {
773 Result r = results[i];
774
775 Cell kv = r.getColumnLatest(colFamily, weightQualifier);
776 R newValue = ci.getValue(colFamily, weightQualifier, kv);
777 S s = ci.castToReturnType(newValue);
778 double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
779
780 if (newSumVal > halfSumVal) {
781 return value;
782 }
783 movingSumVal = newSumVal;
784 kv = r.getColumnLatest(colFamily, qualifier);
785 value = ci.getValue(colFamily, qualifier, kv);
786 }
787 }
788 } while (results != null && results.length > 0);
789 } finally {
790 if (scanner != null) {
791 scanner.close();
792 }
793 }
794 return null;
795 }
796
797 <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest
798 validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
799 throws IOException {
800 validateParameters(scan, canFamilyBeAbsent);
801 final AggregateRequest.Builder requestBuilder =
802 AggregateRequest.newBuilder();
803 requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
804 P columnInterpreterSpecificData = null;
805 if ((columnInterpreterSpecificData = ci.getRequestData())
806 != null) {
807 requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
808 }
809 requestBuilder.setScan(ProtobufUtil.toScan(scan));
810 return requestBuilder.build();
811 }
812
813 byte[] getBytesFromResponse(ByteString response) {
814 ByteBuffer bb = response.asReadOnlyByteBuffer();
815 bb.rewind();
816 byte[] bytes;
817 if (bb.hasArray()) {
818 bytes = bb.array();
819 } else {
820 bytes = response.toByteArray();
821 }
822 return bytes;
823 }
824 }