1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.NavigableSet;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.Coprocessor;
31 import org.apache.hadoop.hbase.CoprocessorEnvironment;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.client.Scan;
34 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
37 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
38 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
39 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
40 import org.apache.hadoop.hbase.regionserver.InternalScanner;
41
42 import com.google.protobuf.ByteString;
43 import com.google.protobuf.Message;
44 import com.google.protobuf.RpcCallback;
45 import com.google.protobuf.RpcController;
46 import com.google.protobuf.Service;
47
48
49
50
51
52
53
54
55
56
57
58
59
60 @InterfaceAudience.Private
61 public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
62 extends AggregateService implements CoprocessorService, Coprocessor {
63 protected static final Log log = LogFactory.getLog(AggregateImplementation.class);
64 private RegionCoprocessorEnvironment env;
65
66
67
68
69
70
71
72
73 @Override
74 public void getMax(RpcController controller, AggregateRequest request,
75 RpcCallback<AggregateResponse> done) {
76 InternalScanner scanner = null;
77 AggregateResponse response = null;
78 T max = null;
79 try {
80 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
81 T temp;
82 Scan scan = ProtobufUtil.toScan(request.getScan());
83 scanner = env.getRegion().getScanner(scan);
84 List<Cell> results = new ArrayList<Cell>();
85 byte[] colFamily = scan.getFamilies()[0];
86 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
87 byte[] qualifier = null;
88 if (qualifiers != null && !qualifiers.isEmpty()) {
89 qualifier = qualifiers.pollFirst();
90 }
91
92 boolean hasMoreRows = false;
93 do {
94 hasMoreRows = scanner.next(results);
95 int listSize = results.size();
96 for (int i = 0; i < listSize; i++) {
97 temp = ci.getValue(colFamily, qualifier, results.get(i));
98 max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
99 }
100 results.clear();
101 } while (hasMoreRows);
102 if (max != null) {
103 AggregateResponse.Builder builder = AggregateResponse.newBuilder();
104 builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
105 response = builder.build();
106 }
107 } catch (IOException e) {
108 ResponseConverter.setControllerException(controller, e);
109 } finally {
110 if (scanner != null) {
111 try {
112 scanner.close();
113 } catch (IOException ignored) {}
114 }
115 }
116 log.info("Maximum from this region is "
117 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + max);
118 done.run(response);
119 }
120
121
122
123
124
125
126
127
128 @Override
129 public void getMin(RpcController controller, AggregateRequest request,
130 RpcCallback<AggregateResponse> done) {
131 AggregateResponse response = null;
132 InternalScanner scanner = null;
133 T min = null;
134 try {
135 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
136 T temp;
137 Scan scan = ProtobufUtil.toScan(request.getScan());
138 scanner = env.getRegion().getScanner(scan);
139 List<Cell> results = new ArrayList<Cell>();
140 byte[] colFamily = scan.getFamilies()[0];
141 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
142 byte[] qualifier = null;
143 if (qualifiers != null && !qualifiers.isEmpty()) {
144 qualifier = qualifiers.pollFirst();
145 }
146 boolean hasMoreRows = false;
147 do {
148 hasMoreRows = scanner.next(results);
149 int listSize = results.size();
150 for (int i = 0; i < listSize; i++) {
151 temp = ci.getValue(colFamily, qualifier, results.get(i));
152 min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
153 }
154 results.clear();
155 } while (hasMoreRows);
156 if (min != null) {
157 response = AggregateResponse.newBuilder().addFirstPart(
158 ci.getProtoForCellType(min).toByteString()).build();
159 }
160 } catch (IOException e) {
161 ResponseConverter.setControllerException(controller, e);
162 } finally {
163 if (scanner != null) {
164 try {
165 scanner.close();
166 } catch (IOException ignored) {}
167 }
168 }
169 log.info("Minimum from this region is "
170 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + min);
171 done.run(response);
172 }
173
174
175
176
177
178
179
180
181 @Override
182 public void getSum(RpcController controller, AggregateRequest request,
183 RpcCallback<AggregateResponse> done) {
184 AggregateResponse response = null;
185 InternalScanner scanner = null;
186 long sum = 0l;
187 try {
188 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
189 S sumVal = null;
190 T temp;
191 Scan scan = ProtobufUtil.toScan(request.getScan());
192 scanner = env.getRegion().getScanner(scan);
193 byte[] colFamily = scan.getFamilies()[0];
194 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
195 byte[] qualifier = null;
196 if (qualifiers != null && !qualifiers.isEmpty()) {
197 qualifier = qualifiers.pollFirst();
198 }
199 List<Cell> results = new ArrayList<Cell>();
200 boolean hasMoreRows = false;
201 do {
202 hasMoreRows = scanner.next(results);
203 int listSize = results.size();
204 for (int i = 0; i < listSize; i++) {
205 temp = ci.getValue(colFamily, qualifier, results.get(i));
206 if (temp != null)
207 sumVal = ci.add(sumVal, ci.castToReturnType(temp));
208 }
209 results.clear();
210 } while (hasMoreRows);
211 if (sumVal != null) {
212 response = AggregateResponse.newBuilder().addFirstPart(
213 ci.getProtoForPromotedType(sumVal).toByteString()).build();
214 }
215 } catch (IOException e) {
216 ResponseConverter.setControllerException(controller, e);
217 } finally {
218 if (scanner != null) {
219 try {
220 scanner.close();
221 } catch (IOException ignored) {}
222 }
223 }
224 log.debug("Sum from this region is "
225 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + sum);
226 done.run(response);
227 }
228
229
230
231
232
233
234 @Override
235 public void getRowNum(RpcController controller, AggregateRequest request,
236 RpcCallback<AggregateResponse> done) {
237 AggregateResponse response = null;
238 long counter = 0l;
239 List<Cell> results = new ArrayList<Cell>();
240 InternalScanner scanner = null;
241 try {
242 Scan scan = ProtobufUtil.toScan(request.getScan());
243 byte[][] colFamilies = scan.getFamilies();
244 byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
245 NavigableSet<byte[]> qualifiers = colFamilies != null ?
246 scan.getFamilyMap().get(colFamily) : null;
247 byte[] qualifier = null;
248 if (qualifiers != null && !qualifiers.isEmpty()) {
249 qualifier = qualifiers.pollFirst();
250 }
251 if (scan.getFilter() == null && qualifier == null)
252 scan.setFilter(new FirstKeyOnlyFilter());
253 scanner = env.getRegion().getScanner(scan);
254 boolean hasMoreRows = false;
255 do {
256 hasMoreRows = scanner.next(results);
257 if (results.size() > 0) {
258 counter++;
259 }
260 results.clear();
261 } while (hasMoreRows);
262 ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
263 bb.rewind();
264 response = AggregateResponse.newBuilder().addFirstPart(
265 ByteString.copyFrom(bb)).build();
266 } catch (IOException e) {
267 ResponseConverter.setControllerException(controller, e);
268 } finally {
269 if (scanner != null) {
270 try {
271 scanner.close();
272 } catch (IOException ignored) {}
273 }
274 }
275 log.info("Row counter from this region is "
276 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter);
277 done.run(response);
278 }
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293 @Override
294 public void getAvg(RpcController controller, AggregateRequest request,
295 RpcCallback<AggregateResponse> done) {
296 AggregateResponse response = null;
297 InternalScanner scanner = null;
298 try {
299 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
300 S sumVal = null;
301 Long rowCountVal = 0l;
302 Scan scan = ProtobufUtil.toScan(request.getScan());
303 scanner = env.getRegion().getScanner(scan);
304 byte[] colFamily = scan.getFamilies()[0];
305 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
306 byte[] qualifier = null;
307 if (qualifiers != null && !qualifiers.isEmpty()) {
308 qualifier = qualifiers.pollFirst();
309 }
310 List<Cell> results = new ArrayList<Cell>();
311 boolean hasMoreRows = false;
312
313 do {
314 results.clear();
315 hasMoreRows = scanner.next(results);
316 int listSize = results.size();
317 for (int i = 0; i < listSize; i++) {
318 sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
319 qualifier, results.get(i))));
320 }
321 rowCountVal++;
322 } while (hasMoreRows);
323 if (sumVal != null) {
324 ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
325 AggregateResponse.Builder pair = AggregateResponse.newBuilder();
326 pair.addFirstPart(first);
327 ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
328 bb.rewind();
329 pair.setSecondPart(ByteString.copyFrom(bb));
330 response = pair.build();
331 }
332 } catch (IOException e) {
333 ResponseConverter.setControllerException(controller, e);
334 } finally {
335 if (scanner != null) {
336 try {
337 scanner.close();
338 } catch (IOException ignored) {}
339 }
340 }
341 done.run(response);
342 }
343
344
345
346
347
348
349
350
351
352
353 @Override
354 public void getStd(RpcController controller, AggregateRequest request,
355 RpcCallback<AggregateResponse> done) {
356 InternalScanner scanner = null;
357 AggregateResponse response = null;
358 try {
359 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
360 S sumVal = null, sumSqVal = null, tempVal = null;
361 long rowCountVal = 0l;
362 Scan scan = ProtobufUtil.toScan(request.getScan());
363 scanner = env.getRegion().getScanner(scan);
364 byte[] colFamily = scan.getFamilies()[0];
365 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
366 byte[] qualifier = null;
367 if (qualifiers != null && !qualifiers.isEmpty()) {
368 qualifier = qualifiers.pollFirst();
369 }
370 List<Cell> results = new ArrayList<Cell>();
371
372 boolean hasMoreRows = false;
373
374 do {
375 tempVal = null;
376 hasMoreRows = scanner.next(results);
377 int listSize = results.size();
378 for (int i = 0; i < listSize; i++) {
379 tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
380 qualifier, results.get(i))));
381 }
382 results.clear();
383 sumVal = ci.add(sumVal, tempVal);
384 sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
385 rowCountVal++;
386 } while (hasMoreRows);
387 if (sumVal != null) {
388 ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
389 ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
390 AggregateResponse.Builder pair = AggregateResponse.newBuilder();
391 pair.addFirstPart(first_sumVal);
392 pair.addFirstPart(first_sumSqVal);
393 ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
394 bb.rewind();
395 pair.setSecondPart(ByteString.copyFrom(bb));
396 response = pair.build();
397 }
398 } catch (IOException e) {
399 ResponseConverter.setControllerException(controller, e);
400 } finally {
401 if (scanner != null) {
402 try {
403 scanner.close();
404 } catch (IOException ignored) {}
405 }
406 }
407 done.run(response);
408 }
409
410
411
412
413
414
415
416
417
418 @Override
419 public void getMedian(RpcController controller, AggregateRequest request,
420 RpcCallback<AggregateResponse> done) {
421 AggregateResponse response = null;
422 InternalScanner scanner = null;
423 try {
424 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
425 S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
426 Scan scan = ProtobufUtil.toScan(request.getScan());
427 scanner = env.getRegion().getScanner(scan);
428 byte[] colFamily = scan.getFamilies()[0];
429 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
430 byte[] valQualifier = null, weightQualifier = null;
431 if (qualifiers != null && !qualifiers.isEmpty()) {
432 valQualifier = qualifiers.pollFirst();
433
434 weightQualifier = qualifiers.pollLast();
435 }
436 List<Cell> results = new ArrayList<Cell>();
437
438 boolean hasMoreRows = false;
439
440 do {
441 tempVal = null;
442 tempWeight = null;
443 hasMoreRows = scanner.next(results);
444 int listSize = results.size();
445 for (int i = 0; i < listSize; i++) {
446 Cell kv = results.get(i);
447 tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
448 valQualifier, kv)));
449 if (weightQualifier != null) {
450 tempWeight = ci.add(tempWeight,
451 ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
452 }
453 }
454 results.clear();
455 sumVal = ci.add(sumVal, tempVal);
456 sumWeights = ci.add(sumWeights, tempWeight);
457 } while (hasMoreRows);
458 ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
459 S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
460 ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
461 AggregateResponse.Builder pair = AggregateResponse.newBuilder();
462 pair.addFirstPart(first_sumVal);
463 pair.addFirstPart(first_sumWeights);
464 response = pair.build();
465 } catch (IOException e) {
466 ResponseConverter.setControllerException(controller, e);
467 } finally {
468 if (scanner != null) {
469 try {
470 scanner.close();
471 } catch (IOException ignored) {}
472 }
473 }
474 done.run(response);
475 }
476
477 @SuppressWarnings("unchecked")
478 ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
479 AggregateRequest request) throws IOException {
480 String className = request.getInterpreterClassName();
481 Class<?> cls;
482 try {
483 cls = Class.forName(className);
484 ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
485 if (request.hasInterpreterSpecificBytes()) {
486 ByteString b = request.getInterpreterSpecificBytes();
487 P initMsg = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 2, b);
488 ci.initialize(initMsg);
489 }
490 return ci;
491 } catch (ClassNotFoundException e) {
492 throw new IOException(e);
493 } catch (InstantiationException e) {
494 throw new IOException(e);
495 } catch (IllegalAccessException e) {
496 throw new IOException(e);
497 }
498 }
499
500 @Override
501 public Service getService() {
502 return this;
503 }
504
505
506
507
508
509
510
511
512
513
514
515 @Override
516 public void start(CoprocessorEnvironment env) throws IOException {
517 if (env instanceof RegionCoprocessorEnvironment) {
518 this.env = (RegionCoprocessorEnvironment)env;
519 } else {
520 throw new CoprocessorException("Must be loaded on a table region!");
521 }
522 }
523
524 @Override
525 public void stop(CoprocessorEnvironment env) throws IOException {
526
527 }
528
529 }