1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client.coprocessor;
21
22 import java.io.Closeable;
23 import java.io.IOException;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NavigableMap;
29 import java.util.NavigableSet;
30 import java.util.TreeMap;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.client.Connection;
42 import org.apache.hadoop.hbase.client.ConnectionFactory;
43 import org.apache.hadoop.hbase.client.Result;
44 import org.apache.hadoop.hbase.client.ResultScanner;
45 import org.apache.hadoop.hbase.client.Scan;
46 import org.apache.hadoop.hbase.client.Table;
47 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
48 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
49 import org.apache.hadoop.hbase.ipc.ServerRpcController;
50 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
51 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
52 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
53 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
54 import org.apache.hadoop.hbase.util.Bytes;
55 import org.apache.hadoop.hbase.util.Pair;
56
57 import com.google.protobuf.ByteString;
58 import com.google.protobuf.Message;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 @InterfaceAudience.Private
82 public class AggregationClient implements Closeable {
83
84 private static final Log log = LogFactory.getLog(AggregationClient.class);
85 private final Connection connection;
86
87
88
89
90
91 public AggregationClient(Configuration cfg) {
92 try {
93
94 this.connection = ConnectionFactory.createConnection(cfg);
95 } catch (IOException e) {
96 throw new RuntimeException(e);
97 }
98 }
99
100 @Override
101 public void close() throws IOException {
102 if (this.connection != null && !this.connection.isClosed()) {
103 this.connection.close();
104 }
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119 public <R, S, P extends Message, Q extends Message, T extends Message> R max(
120 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
121 throws Throwable {
122 try (Table table = connection.getTable(tableName)) {
123 return max(table, ci, scan);
124 }
125 }
126
127
128
129
130
131
132
133
134
135
136
137
138
139 public <R, S, P extends Message, Q extends Message, T extends Message>
140 R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
141 final Scan scan) throws Throwable {
142 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
143 class MaxCallBack implements Batch.Callback<R> {
144 R max = null;
145
146 R getMax() {
147 return max;
148 }
149
150 @Override
151 public synchronized void update(byte[] region, byte[] row, R result) {
152 max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
153 }
154 }
155 MaxCallBack aMaxCallBack = new MaxCallBack();
156 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
157 new Batch.Call<AggregateService, R>() {
158 @Override
159 public R call(AggregateService instance) throws IOException {
160 ServerRpcController controller = new ServerRpcController();
161 BlockingRpcCallback<AggregateResponse> rpcCallback =
162 new BlockingRpcCallback<AggregateResponse>();
163 instance.getMax(controller, requestArg, rpcCallback);
164 AggregateResponse response = rpcCallback.get();
165 if (controller.failedOnException()) {
166 throw controller.getFailedOn();
167 }
168 if (response.getFirstPartCount() > 0) {
169 ByteString b = response.getFirstPart(0);
170 Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
171 return ci.getCellValueFromProto(q);
172 }
173 return null;
174 }
175 }, aMaxCallBack);
176 return aMaxCallBack.getMax();
177 }
178
179
180
181
182
183 private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
184 if (scan == null
185 || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
186 .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
187 || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) &&
188 !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
189 throw new IOException(
190 "Agg client Exception: Startrow should be smaller than Stoprow");
191 } else if (!canFamilyBeAbsent) {
192 if (scan.getFamilyMap().size() != 1) {
193 throw new IOException("There must be only one family.");
194 }
195 }
196 }
197
198
199
200
201
202
203
204
205
206
207
208 public <R, S, P extends Message, Q extends Message, T extends Message> R min(
209 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
210 throws Throwable {
211 try (Table table = connection.getTable(tableName)) {
212 return min(table, ci, scan);
213 }
214 }
215
216
217
218
219
220
221
222
223
224
225
226 public <R, S, P extends Message, Q extends Message, T extends Message>
227 R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
228 final Scan scan) throws Throwable {
229 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
230 class MinCallBack implements Batch.Callback<R> {
231
232 private R min = null;
233
234 public R getMinimum() {
235 return min;
236 }
237
238 @Override
239 public synchronized void update(byte[] region, byte[] row, R result) {
240 min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
241 }
242 }
243 MinCallBack minCallBack = new MinCallBack();
244 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
245 new Batch.Call<AggregateService, R>() {
246
247 @Override
248 public R call(AggregateService instance) throws IOException {
249 ServerRpcController controller = new ServerRpcController();
250 BlockingRpcCallback<AggregateResponse> rpcCallback =
251 new BlockingRpcCallback<AggregateResponse>();
252 instance.getMin(controller, requestArg, rpcCallback);
253 AggregateResponse response = rpcCallback.get();
254 if (controller.failedOnException()) {
255 throw controller.getFailedOn();
256 }
257 if (response.getFirstPartCount() > 0) {
258 ByteString b = response.getFirstPart(0);
259 Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
260 return ci.getCellValueFromProto(q);
261 }
262 return null;
263 }
264 }, minCallBack);
265 log.debug("Min fom all regions is: " + minCallBack.getMinimum());
266 return minCallBack.getMinimum();
267 }
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282 public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
283 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
284 throws Throwable {
285 try (Table table = connection.getTable(tableName)) {
286 return rowCount(table, ci, scan);
287 }
288 }
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303 public <R, S, P extends Message, Q extends Message, T extends Message>
304 long rowCount(final Table table,
305 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
306 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
307 class RowNumCallback implements Batch.Callback<Long> {
308 private final AtomicLong rowCountL = new AtomicLong(0);
309
310 public long getRowNumCount() {
311 return rowCountL.get();
312 }
313
314 @Override
315 public void update(byte[] region, byte[] row, Long result) {
316 rowCountL.addAndGet(result.longValue());
317 }
318 }
319 RowNumCallback rowNum = new RowNumCallback();
320 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
321 new Batch.Call<AggregateService, Long>() {
322 @Override
323 public Long call(AggregateService instance) throws IOException {
324 ServerRpcController controller = new ServerRpcController();
325 BlockingRpcCallback<AggregateResponse> rpcCallback =
326 new BlockingRpcCallback<AggregateResponse>();
327 instance.getRowNum(controller, requestArg, rpcCallback);
328 AggregateResponse response = rpcCallback.get();
329 if (controller.failedOnException()) {
330 throw controller.getFailedOn();
331 }
332 byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
333 ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
334 bb.rewind();
335 return bb.getLong();
336 }
337 }, rowNum);
338 return rowNum.getRowNumCount();
339 }
340
341
342
343
344
345
346
347
348
349
350 public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
351 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
352 throws Throwable {
353 try (Table table = connection.getTable(tableName)) {
354 return sum(table, ci, scan);
355 }
356 }
357
358
359
360
361
362
363
364
365
366
367 public <R, S, P extends Message, Q extends Message, T extends Message>
368 S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
369 final Scan scan) throws Throwable {
370 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
371
372 class SumCallBack implements Batch.Callback<S> {
373 S sumVal = null;
374
375 public S getSumResult() {
376 return sumVal;
377 }
378
379 @Override
380 public synchronized void update(byte[] region, byte[] row, S result) {
381 sumVal = ci.add(sumVal, result);
382 }
383 }
384 SumCallBack sumCallBack = new SumCallBack();
385 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
386 new Batch.Call<AggregateService, S>() {
387 @Override
388 public S call(AggregateService instance) throws IOException {
389 ServerRpcController controller = new ServerRpcController();
390 BlockingRpcCallback<AggregateResponse> rpcCallback =
391 new BlockingRpcCallback<AggregateResponse>();
392 instance.getSum(controller, requestArg, rpcCallback);
393 AggregateResponse response = rpcCallback.get();
394 if (controller.failedOnException()) {
395 throw controller.getFailedOn();
396 }
397 if (response.getFirstPartCount() == 0) {
398 return null;
399 }
400 ByteString b = response.getFirstPart(0);
401 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
402 S s = ci.getPromotedValueFromProto(t);
403 return s;
404 }
405 }, sumCallBack);
406 return sumCallBack.getSumResult();
407 }
408
409
410
411
412
413
414
415
416
417 private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
418 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
419 throws Throwable {
420 try (Table table = connection.getTable(tableName)) {
421 return getAvgArgs(table, ci, scan);
422 }
423 }
424
425
426
427
428
429
430
431
432
433 private <R, S, P extends Message, Q extends Message, T extends Message>
434 Pair<S, Long> getAvgArgs(final Table table,
435 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
436 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
437 class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
438 S sum = null;
439 Long rowCount = 0l;
440
441 public synchronized Pair<S, Long> getAvgArgs() {
442 return new Pair<S, Long>(sum, rowCount);
443 }
444
445 @Override
446 public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
447 sum = ci.add(sum, result.getFirst());
448 rowCount += result.getSecond();
449 }
450 }
451 AvgCallBack avgCallBack = new AvgCallBack();
452 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
453 new Batch.Call<AggregateService, Pair<S, Long>>() {
454 @Override
455 public Pair<S, Long> call(AggregateService instance) throws IOException {
456 ServerRpcController controller = new ServerRpcController();
457 BlockingRpcCallback<AggregateResponse> rpcCallback =
458 new BlockingRpcCallback<AggregateResponse>();
459 instance.getAvg(controller, requestArg, rpcCallback);
460 AggregateResponse response = rpcCallback.get();
461 if (controller.failedOnException()) {
462 throw controller.getFailedOn();
463 }
464 Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
465 if (response.getFirstPartCount() == 0) {
466 return pair;
467 }
468 ByteString b = response.getFirstPart(0);
469 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
470 S s = ci.getPromotedValueFromProto(t);
471 pair.setFirst(s);
472 ByteBuffer bb = ByteBuffer.allocate(8).put(
473 getBytesFromResponse(response.getSecondPart()));
474 bb.rewind();
475 pair.setSecond(bb.getLong());
476 return pair;
477 }
478 }, avgCallBack);
479 return avgCallBack.getAvgArgs();
480 }
481
482
483
484
485
486
487
488
489
490
491
492
493
494 public <R, S, P extends Message, Q extends Message, T extends Message>
495 double avg(final TableName tableName,
496 final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
497 Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
498 return ci.divideForAvg(p.getFirst(), p.getSecond());
499 }
500
501
502
503
504
505
506
507
508
509
510
511
512
513 public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
514 final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
515 Pair<S, Long> p = getAvgArgs(table, ci, scan);
516 return ci.divideForAvg(p.getFirst(), p.getSecond());
517 }
518
519
520
521
522
523
524
525
526
527
528
529
530 private <R, S, P extends Message, Q extends Message, T extends Message>
531 Pair<List<S>, Long> getStdArgs(final Table table,
532 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
533 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
534 class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
535 long rowCountVal = 0l;
536 S sumVal = null, sumSqVal = null;
537
538 public synchronized Pair<List<S>, Long> getStdParams() {
539 List<S> l = new ArrayList<S>();
540 l.add(sumVal);
541 l.add(sumSqVal);
542 Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
543 return p;
544 }
545
546 @Override
547 public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
548 if (result.getFirst().size() > 0) {
549 sumVal = ci.add(sumVal, result.getFirst().get(0));
550 sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
551 rowCountVal += result.getSecond();
552 }
553 }
554 }
555 StdCallback stdCallback = new StdCallback();
556 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
557 new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
558 @Override
559 public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
560 ServerRpcController controller = new ServerRpcController();
561 BlockingRpcCallback<AggregateResponse> rpcCallback =
562 new BlockingRpcCallback<AggregateResponse>();
563 instance.getStd(controller, requestArg, rpcCallback);
564 AggregateResponse response = rpcCallback.get();
565 if (controller.failedOnException()) {
566 throw controller.getFailedOn();
567 }
568 Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
569 if (response.getFirstPartCount() == 0) {
570 return pair;
571 }
572 List<S> list = new ArrayList<S>();
573 for (int i = 0; i < response.getFirstPartCount(); i++) {
574 ByteString b = response.getFirstPart(i);
575 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
576 S s = ci.getPromotedValueFromProto(t);
577 list.add(s);
578 }
579 pair.setFirst(list);
580 ByteBuffer bb = ByteBuffer.allocate(8).put(
581 getBytesFromResponse(response.getSecondPart()));
582 bb.rewind();
583 pair.setSecond(bb.getLong());
584 return pair;
585 }
586 }, stdCallback);
587 return stdCallback.getStdParams();
588 }
589
590
591
592
593
594
595
596
597
598
599
600
601
602 public <R, S, P extends Message, Q extends Message, T extends Message>
603 double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
604 Scan scan) throws Throwable {
605 try (Table table = connection.getTable(tableName)) {
606 return std(table, ci, scan);
607 }
608 }
609
610
611
612
613
614
615
616
617
618
619
620
621
622 public <R, S, P extends Message, Q extends Message, T extends Message> double std(
623 final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
624 Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
625 double res = 0d;
626 double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
627 double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
628 res = avgOfSumSq - (avg) * (avg);
629 res = Math.pow(res, 0.5);
630 return res;
631 }
632
633
634
635
636
637
638
639
640
641
642
643
644
645 private <R, S, P extends Message, Q extends Message, T extends Message>
646 Pair<NavigableMap<byte[], List<S>>, List<S>>
647 getMedianArgs(final Table table,
648 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
649 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
650 final NavigableMap<byte[], List<S>> map =
651 new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
652 class StdCallback implements Batch.Callback<List<S>> {
653 S sumVal = null, sumWeights = null;
654
655 public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
656 List<S> l = new ArrayList<S>();
657 l.add(sumVal);
658 l.add(sumWeights);
659 Pair<NavigableMap<byte[], List<S>>, List<S>> p =
660 new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
661 return p;
662 }
663
664 @Override
665 public synchronized void update(byte[] region, byte[] row, List<S> result) {
666 map.put(row, result);
667 sumVal = ci.add(sumVal, result.get(0));
668 sumWeights = ci.add(sumWeights, result.get(1));
669 }
670 }
671 StdCallback stdCallback = new StdCallback();
672 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
673 new Batch.Call<AggregateService, List<S>>() {
674 @Override
675 public List<S> call(AggregateService instance) throws IOException {
676 ServerRpcController controller = new ServerRpcController();
677 BlockingRpcCallback<AggregateResponse> rpcCallback =
678 new BlockingRpcCallback<AggregateResponse>();
679 instance.getMedian(controller, requestArg, rpcCallback);
680 AggregateResponse response = rpcCallback.get();
681 if (controller.failedOnException()) {
682 throw controller.getFailedOn();
683 }
684
685 List<S> list = new ArrayList<S>();
686 for (int i = 0; i < response.getFirstPartCount(); i++) {
687 ByteString b = response.getFirstPart(i);
688 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
689 S s = ci.getPromotedValueFromProto(t);
690 list.add(s);
691 }
692 return list;
693 }
694
695 }, stdCallback);
696 return stdCallback.getMedianParams();
697 }
698
699
700
701
702
703
704
705
706
707
708
709 public <R, S, P extends Message, Q extends Message, T extends Message>
710 R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
711 Scan scan) throws Throwable {
712 try (Table table = connection.getTable(tableName)) {
713 return median(table, ci, scan);
714 }
715 }
716
717
718
719
720
721
722
723
724
725
726
727 public <R, S, P extends Message, Q extends Message, T extends Message>
728 R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci,
729 Scan scan) throws Throwable {
730 Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
731 byte[] startRow = null;
732 byte[] colFamily = scan.getFamilies()[0];
733 NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
734 NavigableMap<byte[], List<S>> map = p.getFirst();
735 S sumVal = p.getSecond().get(0);
736 S sumWeights = p.getSecond().get(1);
737 double halfSumVal = ci.divideForAvg(sumVal, 2L);
738 double movingSumVal = 0;
739 boolean weighted = false;
740 if (quals.size() > 1) {
741 weighted = true;
742 halfSumVal = ci.divideForAvg(sumWeights, 2L);
743 }
744
745 for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
746 S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
747 double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
748 if (newSumVal > halfSumVal) break;
749 movingSumVal = newSumVal;
750 startRow = entry.getKey();
751 }
752
753 Scan scan2 = new Scan(scan);
754
755 if (startRow != null) scan2.setStartRow(startRow);
756 ResultScanner scanner = null;
757 try {
758 int cacheSize = scan2.getCaching();
759 if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
760 scan2.setCacheBlocks(true);
761 cacheSize = 5;
762 scan2.setCaching(cacheSize);
763 }
764 scanner = table.getScanner(scan2);
765 Result[] results = null;
766 byte[] qualifier = quals.pollFirst();
767
768 byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
769 R value = null;
770 do {
771 results = scanner.next(cacheSize);
772 if (results != null && results.length > 0) {
773 for (int i = 0; i < results.length; i++) {
774 Result r = results[i];
775
776 Cell kv = r.getColumnLatest(colFamily, weightQualifier);
777 R newValue = ci.getValue(colFamily, weightQualifier, kv);
778 S s = ci.castToReturnType(newValue);
779 double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
780
781 if (newSumVal > halfSumVal) {
782 return value;
783 }
784 movingSumVal = newSumVal;
785 kv = r.getColumnLatest(colFamily, qualifier);
786 value = ci.getValue(colFamily, qualifier, kv);
787 }
788 }
789 } while (results != null && results.length > 0);
790 } finally {
791 if (scanner != null) {
792 scanner.close();
793 }
794 }
795 return null;
796 }
797
798 <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest
799 validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
800 throws IOException {
801 validateParameters(scan, canFamilyBeAbsent);
802 final AggregateRequest.Builder requestBuilder =
803 AggregateRequest.newBuilder();
804 requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
805 P columnInterpreterSpecificData = null;
806 if ((columnInterpreterSpecificData = ci.getRequestData())
807 != null) {
808 requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
809 }
810 requestBuilder.setScan(ProtobufUtil.toScan(scan));
811 return requestBuilder.build();
812 }
813
814 byte[] getBytesFromResponse(ByteString response) {
815 ByteBuffer bb = response.asReadOnlyByteBuffer();
816 bb.rewind();
817 byte[] bytes;
818 if (bb.hasArray()) {
819 bytes = bb.array();
820 } else {
821 bytes = response.toByteArray();
822 }
823 return bytes;
824 }
825 }