1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client.coprocessor;
21
22 import com.google.protobuf.ByteString;
23 import com.google.protobuf.Message;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.KeyValue;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.client.ResultScanner;
34 import org.apache.hadoop.hbase.client.Scan;
35 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
36 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
37 import org.apache.hadoop.hbase.ipc.ServerRpcController;
38 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
39 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateArgument;
40 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
41 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.hbase.util.Pair;
44
45 import java.io.IOException;
46 import java.nio.ByteBuffer;
47 import java.util.ArrayList;
48 import java.util.List;
49 import java.util.Map;
50 import java.util.NavigableMap;
51 import java.util.NavigableSet;
52 import java.util.TreeMap;
53 import java.util.concurrent.atomic.AtomicLong;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 @InterfaceAudience.Public
75 @InterfaceStability.Evolving
76 public class AggregationClient {
77
78 private static final Log log = LogFactory.getLog(AggregationClient.class);
79 Configuration conf;
80
81
82
83
84
85 public AggregationClient(Configuration cfg) {
86 this.conf = cfg;
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101 public <R, S, P extends Message, Q extends Message, T extends Message> R max(
102 final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
103 throws Throwable {
104 HTable table = null;
105 try {
106 table = new HTable(conf, tableName);
107 return max(table, ci, scan);
108 } finally {
109 if (table != null) {
110 table.close();
111 }
112 }
113 }
114
115
116
117
118
119
120
121
122
123
124
125
126
127 public <R, S, P extends Message, Q extends Message, T extends Message>
128 R max(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
129 final Scan scan) throws Throwable {
130 final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
131 class MaxCallBack implements Batch.Callback<R> {
132 R max = null;
133
134 R getMax() {
135 return max;
136 }
137
138 @Override
139 public synchronized void update(byte[] region, byte[] row, R result) {
140 max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
141 }
142 }
143 MaxCallBack aMaxCallBack = new MaxCallBack();
144 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
145 new Batch.Call<AggregateService, R>() {
146 @Override
147 public R call(AggregateService instance) throws IOException {
148 ServerRpcController controller = new ServerRpcController();
149 BlockingRpcCallback<AggregateResponse> rpcCallback =
150 new BlockingRpcCallback<AggregateResponse>();
151 instance.getMax(controller, requestArg, rpcCallback);
152 AggregateResponse response = rpcCallback.get();
153 if (controller.failedOnException()) {
154 throw controller.getFailedOn();
155 }
156 if (response.getFirstPartCount() > 0) {
157 ByteString b = response.getFirstPart(0);
158 Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
159 return ci.getCellValueFromProto(q);
160 }
161 return null;
162 }
163 }, aMaxCallBack);
164 return aMaxCallBack.getMax();
165 }
166
167 private void validateParameters(Scan scan) throws IOException {
168 if (scan == null
169 || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
170 .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
171 || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) &&
172 !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
173 throw new IOException(
174 "Agg client Exception: Startrow should be smaller than Stoprow");
175 } else if (scan.getFamilyMap().size() != 1) {
176 throw new IOException("There must be only one family.");
177 }
178 }
179
180
181
182
183
184
185
186
187
188
189
190 public <R, S, P extends Message, Q extends Message, T extends Message> R min(
191 final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
192 throws Throwable {
193 HTable table = null;
194 try {
195 table = new HTable(conf, tableName);
196 return min(table, ci, scan);
197 } finally {
198 if (table != null) {
199 table.close();
200 }
201 }
202 }
203
204
205
206
207
208
209
210
211
212
213
214 public <R, S, P extends Message, Q extends Message, T extends Message>
215 R min(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
216 final Scan scan) throws Throwable {
217 final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
218 class MinCallBack implements Batch.Callback<R> {
219
220 private R min = null;
221
222 public R getMinimum() {
223 return min;
224 }
225
226 @Override
227 public synchronized void update(byte[] region, byte[] row, R result) {
228 min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
229 }
230 }
231 MinCallBack minCallBack = new MinCallBack();
232 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
233 new Batch.Call<AggregateService, R>() {
234
235 @Override
236 public R call(AggregateService instance) throws IOException {
237 ServerRpcController controller = new ServerRpcController();
238 BlockingRpcCallback<AggregateResponse> rpcCallback =
239 new BlockingRpcCallback<AggregateResponse>();
240 instance.getMin(controller, requestArg, rpcCallback);
241 AggregateResponse response = rpcCallback.get();
242 if (controller.failedOnException()) {
243 throw controller.getFailedOn();
244 }
245 if (response.getFirstPartCount() > 0) {
246 ByteString b = response.getFirstPart(0);
247 Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
248 return ci.getCellValueFromProto(q);
249 }
250 return null;
251 }
252 }, minCallBack);
253 log.debug("Min fom all regions is: " + minCallBack.getMinimum());
254 return minCallBack.getMinimum();
255 }
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270 public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
271 final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
272 throws Throwable {
273 HTable table = null;
274 try {
275 table = new HTable(conf, tableName);
276 return rowCount(table, ci, scan);
277 } finally {
278 if (table != null) {
279 table.close();
280 }
281 }
282 }
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297 public <R, S, P extends Message, Q extends Message, T extends Message>
298 long rowCount(final HTable table,
299 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
300 final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
301 class RowNumCallback implements Batch.Callback<Long> {
302 private final AtomicLong rowCountL = new AtomicLong(0);
303
304 public long getRowNumCount() {
305 return rowCountL.get();
306 }
307
308 @Override
309 public void update(byte[] region, byte[] row, Long result) {
310 rowCountL.addAndGet(result.longValue());
311 }
312 }
313 RowNumCallback rowNum = new RowNumCallback();
314 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
315 new Batch.Call<AggregateService, Long>() {
316 @Override
317 public Long call(AggregateService instance) throws IOException {
318 ServerRpcController controller = new ServerRpcController();
319 BlockingRpcCallback<AggregateResponse> rpcCallback =
320 new BlockingRpcCallback<AggregateResponse>();
321 instance.getRowNum(controller, requestArg, rpcCallback);
322 AggregateResponse response = rpcCallback.get();
323 if (controller.failedOnException()) {
324 throw controller.getFailedOn();
325 }
326 byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
327 ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
328 bb.rewind();
329 return bb.getLong();
330 }
331 }, rowNum);
332 return rowNum.getRowNumCount();
333 }
334
335
336
337
338
339
340
341
342
343
344 public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
345 final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
346 throws Throwable {
347 HTable table = null;
348 try {
349 table = new HTable(conf, tableName);
350 return sum(table, ci, scan);
351 } finally {
352 if (table != null) {
353 table.close();
354 }
355 }
356 }
357
358
359
360
361
362
363
364
365
366
367 public <R, S, P extends Message, Q extends Message, T extends Message>
368 S sum(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
369 final Scan scan) throws Throwable {
370 final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
371
372 class SumCallBack implements Batch.Callback<S> {
373 S sumVal = null;
374
375 public S getSumResult() {
376 return sumVal;
377 }
378
379 @Override
380 public synchronized void update(byte[] region, byte[] row, S result) {
381 sumVal = ci.add(sumVal, result);
382 }
383 }
384 SumCallBack sumCallBack = new SumCallBack();
385 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
386 new Batch.Call<AggregateService, S>() {
387 @Override
388 public S call(AggregateService instance) throws IOException {
389 ServerRpcController controller = new ServerRpcController();
390 BlockingRpcCallback<AggregateResponse> rpcCallback =
391 new BlockingRpcCallback<AggregateResponse>();
392 instance.getSum(controller, requestArg, rpcCallback);
393 AggregateResponse response = rpcCallback.get();
394 if (controller.failedOnException()) {
395 throw controller.getFailedOn();
396 }
397 if (response.getFirstPartCount() == 0) {
398 return null;
399 }
400 ByteString b = response.getFirstPart(0);
401 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
402 S s = ci.getPromotedValueFromProto(t);
403 return s;
404 }
405 }, sumCallBack);
406 return sumCallBack.getSumResult();
407 }
408
409
410
411
412
413
414
415
416
417 private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
418 final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
419 throws Throwable {
420 HTable table = null;
421 try {
422 table = new HTable(conf, tableName);
423 return getAvgArgs(table, ci, scan);
424 } finally {
425 if (table != null) {
426 table.close();
427 }
428 }
429 }
430
431
432
433
434
435
436
437
438
439 private <R, S, P extends Message, Q extends Message, T extends Message>
440 Pair<S, Long> getAvgArgs(final HTable table,
441 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
442 final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
443 class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
444 S sum = null;
445 Long rowCount = 0l;
446
447 public Pair<S, Long> getAvgArgs() {
448 return new Pair<S, Long>(sum, rowCount);
449 }
450
451 @Override
452 public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
453 sum = ci.add(sum, result.getFirst());
454 rowCount += result.getSecond();
455 }
456 }
457 AvgCallBack avgCallBack = new AvgCallBack();
458 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
459 new Batch.Call<AggregateService, Pair<S, Long>>() {
460 @Override
461 public Pair<S, Long> call(AggregateService instance) throws IOException {
462 ServerRpcController controller = new ServerRpcController();
463 BlockingRpcCallback<AggregateResponse> rpcCallback =
464 new BlockingRpcCallback<AggregateResponse>();
465 instance.getAvg(controller, requestArg, rpcCallback);
466 AggregateResponse response = rpcCallback.get();
467 if (controller.failedOnException()) {
468 throw controller.getFailedOn();
469 }
470 Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
471 if (response.getFirstPartCount() == 0) {
472 return pair;
473 }
474 ByteString b = response.getFirstPart(0);
475 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
476 S s = ci.getPromotedValueFromProto(t);
477 pair.setFirst(s);
478 ByteBuffer bb = ByteBuffer.allocate(8).put(
479 getBytesFromResponse(response.getSecondPart()));
480 bb.rewind();
481 pair.setSecond(bb.getLong());
482 return pair;
483 }
484 }, avgCallBack);
485 return avgCallBack.getAvgArgs();
486 }
487
488
489
490
491
492
493
494
495
496
497
498
499
500 public <R, S, P extends Message, Q extends Message, T extends Message>
501 double avg(final byte[] tableName,
502 final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
503 Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
504 return ci.divideForAvg(p.getFirst(), p.getSecond());
505 }
506
507
508
509
510
511
512
513
514
515
516
517
518
519 public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
520 final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
521 Pair<S, Long> p = getAvgArgs(table, ci, scan);
522 return ci.divideForAvg(p.getFirst(), p.getSecond());
523 }
524
525
526
527
528
529
530
531
532
533
534
535
536 private <R, S, P extends Message, Q extends Message, T extends Message>
537 Pair<List<S>, Long> getStdArgs(final HTable table,
538 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
539 final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
540 class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
541 long rowCountVal = 0l;
542 S sumVal = null, sumSqVal = null;
543
544 public Pair<List<S>, Long> getStdParams() {
545 List<S> l = new ArrayList<S>();
546 l.add(sumVal);
547 l.add(sumSqVal);
548 Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
549 return p;
550 }
551
552 @Override
553 public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
554 if (result.getFirst().size() > 0) {
555 sumVal = ci.add(sumVal, result.getFirst().get(0));
556 sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
557 rowCountVal += result.getSecond();
558 }
559 }
560 }
561 StdCallback stdCallback = new StdCallback();
562 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
563 new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
564 @Override
565 public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
566 ServerRpcController controller = new ServerRpcController();
567 BlockingRpcCallback<AggregateResponse> rpcCallback =
568 new BlockingRpcCallback<AggregateResponse>();
569 instance.getStd(controller, requestArg, rpcCallback);
570 AggregateResponse response = rpcCallback.get();
571 if (controller.failedOnException()) {
572 throw controller.getFailedOn();
573 }
574 Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
575 if (response.getFirstPartCount() == 0) {
576 return pair;
577 }
578 List<S> list = new ArrayList<S>();
579 for (int i = 0; i < response.getFirstPartCount(); i++) {
580 ByteString b = response.getFirstPart(i);
581 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
582 S s = ci.getPromotedValueFromProto(t);
583 list.add(s);
584 }
585 pair.setFirst(list);
586 ByteBuffer bb = ByteBuffer.allocate(8).put(
587 getBytesFromResponse(response.getSecondPart()));
588 bb.rewind();
589 pair.setSecond(bb.getLong());
590 return pair;
591 }
592 }, stdCallback);
593 return stdCallback.getStdParams();
594 }
595
596
597
598
599
600
601
602
603
604
605
606
607
608 public <R, S, P extends Message, Q extends Message, T extends Message>
609 double std(final byte[] tableName, ColumnInterpreter<R, S, P, Q, T> ci,
610 Scan scan) throws Throwable {
611 HTable table = null;
612 try {
613 table = new HTable(conf, tableName);
614 return std(table, ci, scan);
615 } finally {
616 if (table != null) {
617 table.close();
618 }
619 }
620 }
621
622
623
624
625
626
627
628
629
630
631
632
633
634 public <R, S, P extends Message, Q extends Message, T extends Message> double std(
635 final HTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
636 Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
637 double res = 0d;
638 double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
639 double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
640 res = avgOfSumSq - (avg) * (avg);
641 res = Math.pow(res, 0.5);
642 return res;
643 }
644
645
646
647
648
649
650
651
652
653
654
655
656
657 private <R, S, P extends Message, Q extends Message, T extends Message>
658 Pair<NavigableMap<byte[], List<S>>, List<S>>
659 getMedianArgs(final HTable table,
660 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
661 final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
662 final NavigableMap<byte[], List<S>> map =
663 new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
664 class StdCallback implements Batch.Callback<List<S>> {
665 S sumVal = null, sumWeights = null;
666
667 public Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
668 List<S> l = new ArrayList<S>();
669 l.add(sumVal);
670 l.add(sumWeights);
671 Pair<NavigableMap<byte[], List<S>>, List<S>> p =
672 new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
673 return p;
674 }
675
676 @Override
677 public synchronized void update(byte[] region, byte[] row, List<S> result) {
678 map.put(row, result);
679 sumVal = ci.add(sumVal, result.get(0));
680 sumWeights = ci.add(sumWeights, result.get(1));
681 }
682 }
683 StdCallback stdCallback = new StdCallback();
684 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
685 new Batch.Call<AggregateService, List<S>>() {
686 @Override
687 public List<S> call(AggregateService instance) throws IOException {
688 ServerRpcController controller = new ServerRpcController();
689 BlockingRpcCallback<AggregateResponse> rpcCallback =
690 new BlockingRpcCallback<AggregateResponse>();
691 instance.getMedian(controller, requestArg, rpcCallback);
692 AggregateResponse response = rpcCallback.get();
693 if (controller.failedOnException()) {
694 throw controller.getFailedOn();
695 }
696
697 List<S> list = new ArrayList<S>();
698 for (int i = 0; i < response.getFirstPartCount(); i++) {
699 ByteString b = response.getFirstPart(i);
700 T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
701 S s = ci.getPromotedValueFromProto(t);
702 list.add(s);
703 }
704 return list;
705 }
706
707 }, stdCallback);
708 return stdCallback.getMedianParams();
709 }
710
711
712
713
714
715
716
717
718
719
720
721 public <R, S, P extends Message, Q extends Message, T extends Message>
722 R median(final byte[] tableName, ColumnInterpreter<R, S, P, Q, T> ci,
723 Scan scan) throws Throwable {
724 HTable table = null;
725 try {
726 table = new HTable(conf, tableName);
727 return median(table, ci, scan);
728 } finally {
729 if (table != null) {
730 table.close();
731 }
732 }
733 }
734
735
736
737
738
739
740
741
742
743
744
745 public <R, S, P extends Message, Q extends Message, T extends Message>
746 R median(final HTable table, ColumnInterpreter<R, S, P, Q, T> ci,
747 Scan scan) throws Throwable {
748 Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
749 byte[] startRow = null;
750 byte[] colFamily = scan.getFamilies()[0];
751 NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
752 NavigableMap<byte[], List<S>> map = p.getFirst();
753 S sumVal = p.getSecond().get(0);
754 S sumWeights = p.getSecond().get(1);
755 double halfSumVal = ci.divideForAvg(sumVal, 2L);
756 double movingSumVal = 0;
757 boolean weighted = false;
758 if (quals.size() > 1) {
759 weighted = true;
760 halfSumVal = ci.divideForAvg(sumWeights, 2L);
761 }
762
763 for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
764 S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
765 double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
766 if (newSumVal > halfSumVal) break;
767 movingSumVal = newSumVal;
768 startRow = entry.getKey();
769 }
770
771 Scan scan2 = new Scan(scan);
772
773 if (startRow != null) scan2.setStartRow(startRow);
774 ResultScanner scanner = null;
775 try {
776 int cacheSize = scan2.getCaching();
777 if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
778 scan2.setCacheBlocks(true);
779 cacheSize = 5;
780 scan2.setCaching(cacheSize);
781 }
782 scanner = table.getScanner(scan2);
783 Result[] results = null;
784 byte[] qualifier = quals.pollFirst();
785
786 byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
787 R value = null;
788 do {
789 results = scanner.next(cacheSize);
790 if (results != null && results.length > 0) {
791 for (int i = 0; i < results.length; i++) {
792 Result r = results[i];
793
794 KeyValue kv = r.getColumnLatest(colFamily, weightQualifier);
795 R newValue = ci.getValue(colFamily, weightQualifier, kv);
796 S s = ci.castToReturnType(newValue);
797 double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
798
799 if (newSumVal > halfSumVal) {
800 return value;
801 }
802 movingSumVal = newSumVal;
803 kv = r.getColumnLatest(colFamily, qualifier);
804 value = ci.getValue(colFamily, qualifier, kv);
805 }
806 }
807 } while (results != null && results.length > 0);
808 } finally {
809 if (scanner != null) {
810 scanner.close();
811 }
812 }
813 return null;
814 }
815
816 <R, S, P extends Message, Q extends Message, T extends Message> AggregateArgument
817 validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci)
818 throws IOException {
819 validateParameters(scan);
820 final AggregateArgument.Builder requestBuilder =
821 AggregateArgument.newBuilder();
822 requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
823 P columnInterpreterSpecificData = null;
824 if ((columnInterpreterSpecificData = ci.getRequestData())
825 != null) {
826 requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
827 }
828 requestBuilder.setScan(ProtobufUtil.toScan(scan));
829 return requestBuilder.build();
830 }
831
832 byte[] getBytesFromResponse(ByteString response) {
833 ByteBuffer bb = response.asReadOnlyByteBuffer();
834 bb.rewind();
835 byte[] bytes;
836 if (bb.hasArray()) {
837 bytes = bb.array();
838 } else {
839 bytes = response.toByteArray();
840 }
841 return bytes;
842 }
843 }