001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; 021 022import com.google.protobuf.ByteString; 023import com.google.protobuf.Message; 024import com.google.protobuf.RpcCallback; 025import com.google.protobuf.RpcController; 026import com.google.protobuf.Service; 027 028import java.io.IOException; 029import java.lang.reflect.InvocationTargetException; 030import java.nio.ByteBuffer; 031import java.util.ArrayList; 032import java.util.Collections; 033import java.util.List; 034import java.util.NavigableSet; 035 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CoprocessorEnvironment; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 040import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 041import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 042import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; 043import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; 044import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; 045import org.apache.hadoop.hbase.regionserver.InternalScanner; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * A concrete AggregateProtocol implementation. Its system level coprocessor 052 * that computes the aggregate function at a region level. 053 * {@link ColumnInterpreter} is used to interpret column value. This class is 054 * parameterized with the following (these are the types with which the {@link ColumnInterpreter} 055 * is parameterized, and for more description on these, refer to {@link ColumnInterpreter}): 056 * @param T Cell value data type 057 * @param S Promoted data type 058 * @param P PB message that is used to transport initializer specific bytes 059 * @param Q PB message that is used to transport Cell (<T>) instance 060 * @param R PB message that is used to transport Promoted (<S>) instance 061 */ 062@InterfaceAudience.Private 063public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message> 064 extends AggregateService implements RegionCoprocessor { 065 protected static final Logger log = LoggerFactory.getLogger(AggregateImplementation.class); 066 private RegionCoprocessorEnvironment env; 067 068 /** 069 * Gives the maximum for a given combination of column qualifier and column 070 * family, in the given row range as defined in the Scan object. In its 071 * current implementation, it takes one column family and one column qualifier 072 * (if provided). In case of null column qualifier, maximum value for the 073 * entire column family will be returned. 074 */ 075 @Override 076 public void getMax(RpcController controller, AggregateRequest request, 077 RpcCallback<AggregateResponse> done) { 078 InternalScanner scanner = null; 079 AggregateResponse response = null; 080 T max = null; 081 try { 082 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); 083 T temp; 084 Scan scan = ProtobufUtil.toScan(request.getScan()); 085 scanner = env.getRegion().getScanner(scan); 086 List<Cell> results = new ArrayList<>(); 087 byte[] colFamily = scan.getFamilies()[0]; 088 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); 089 byte[] qualifier = null; 090 if (qualifiers != null && !qualifiers.isEmpty()) { 091 qualifier = qualifiers.pollFirst(); 092 } 093 // qualifier can be null. 094 boolean hasMoreRows = false; 095 do { 096 hasMoreRows = scanner.next(results); 097 int listSize = results.size(); 098 for (int i = 0; i < listSize; i++) { 099 temp = ci.getValue(colFamily, qualifier, results.get(i)); 100 max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max; 101 } 102 results.clear(); 103 } while (hasMoreRows); 104 if (max != null) { 105 AggregateResponse.Builder builder = AggregateResponse.newBuilder(); 106 builder.addFirstPart(ci.getProtoForCellType(max).toByteString()); 107 response = builder.build(); 108 } 109 } catch (IOException e) { 110 CoprocessorRpcUtils.setControllerException(controller, e); 111 } finally { 112 if (scanner != null) { 113 try { 114 scanner.close(); 115 } catch (IOException ignored) {} 116 } 117 } 118 log.info("Maximum from this region is " 119 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + max); 120 done.run(response); 121 } 122 123 /** 124 * Gives the minimum for a given combination of column qualifier and column 125 * family, in the given row range as defined in the Scan object. In its 126 * current implementation, it takes one column family and one column qualifier 127 * (if provided). In case of null column qualifier, minimum value for the 128 * entire column family will be returned. 129 */ 130 @Override 131 public void getMin(RpcController controller, AggregateRequest request, 132 RpcCallback<AggregateResponse> done) { 133 AggregateResponse response = null; 134 InternalScanner scanner = null; 135 T min = null; 136 try { 137 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); 138 T temp; 139 Scan scan = ProtobufUtil.toScan(request.getScan()); 140 scanner = env.getRegion().getScanner(scan); 141 List<Cell> results = new ArrayList<>(); 142 byte[] colFamily = scan.getFamilies()[0]; 143 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); 144 byte[] qualifier = null; 145 if (qualifiers != null && !qualifiers.isEmpty()) { 146 qualifier = qualifiers.pollFirst(); 147 } 148 boolean hasMoreRows = false; 149 do { 150 hasMoreRows = scanner.next(results); 151 int listSize = results.size(); 152 for (int i = 0; i < listSize; i++) { 153 temp = ci.getValue(colFamily, qualifier, results.get(i)); 154 min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min; 155 } 156 results.clear(); 157 } while (hasMoreRows); 158 if (min != null) { 159 response = AggregateResponse.newBuilder().addFirstPart( 160 ci.getProtoForCellType(min).toByteString()).build(); 161 } 162 } catch (IOException e) { 163 CoprocessorRpcUtils.setControllerException(controller, e); 164 } finally { 165 if (scanner != null) { 166 try { 167 scanner.close(); 168 } catch (IOException ignored) {} 169 } 170 } 171 log.info("Minimum from this region is " 172 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + min); 173 done.run(response); 174 } 175 176 /** 177 * Gives the sum for a given combination of column qualifier and column 178 * family, in the given row range as defined in the Scan object. In its 179 * current implementation, it takes one column family and one column qualifier 180 * (if provided). In case of null column qualifier, sum for the entire column 181 * family will be returned. 182 */ 183 @Override 184 public void getSum(RpcController controller, AggregateRequest request, 185 RpcCallback<AggregateResponse> done) { 186 AggregateResponse response = null; 187 InternalScanner scanner = null; 188 long sum = 0L; 189 try { 190 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); 191 S sumVal = null; 192 T temp; 193 Scan scan = ProtobufUtil.toScan(request.getScan()); 194 scanner = env.getRegion().getScanner(scan); 195 byte[] colFamily = scan.getFamilies()[0]; 196 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); 197 byte[] qualifier = null; 198 if (qualifiers != null && !qualifiers.isEmpty()) { 199 qualifier = qualifiers.pollFirst(); 200 } 201 List<Cell> results = new ArrayList<>(); 202 boolean hasMoreRows = false; 203 do { 204 hasMoreRows = scanner.next(results); 205 int listSize = results.size(); 206 for (int i = 0; i < listSize; i++) { 207 temp = ci.getValue(colFamily, qualifier, results.get(i)); 208 if (temp != null) { 209 sumVal = ci.add(sumVal, ci.castToReturnType(temp)); 210 } 211 } 212 results.clear(); 213 } while (hasMoreRows); 214 if (sumVal != null) { 215 response = AggregateResponse.newBuilder().addFirstPart( 216 ci.getProtoForPromotedType(sumVal).toByteString()).build(); 217 } 218 } catch (IOException e) { 219 CoprocessorRpcUtils.setControllerException(controller, e); 220 } finally { 221 if (scanner != null) { 222 try { 223 scanner.close(); 224 } catch (IOException ignored) {} 225 } 226 } 227 log.debug("Sum from this region is " 228 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + sum); 229 done.run(response); 230 } 231 232 /** 233 * Gives the row count for the given column family and column qualifier, in 234 * the given row range as defined in the Scan object. 235 */ 236 @Override 237 public void getRowNum(RpcController controller, AggregateRequest request, 238 RpcCallback<AggregateResponse> done) { 239 AggregateResponse response = null; 240 long counter = 0L; 241 List<Cell> results = new ArrayList<>(); 242 InternalScanner scanner = null; 243 try { 244 Scan scan = ProtobufUtil.toScan(request.getScan()); 245 byte[][] colFamilies = scan.getFamilies(); 246 byte[] colFamily = colFamilies != null ? colFamilies[0] : null; 247 NavigableSet<byte[]> qualifiers = colFamilies != null ? 248 scan.getFamilyMap().get(colFamily) : null; 249 byte[] qualifier = null; 250 if (qualifiers != null && !qualifiers.isEmpty()) { 251 qualifier = qualifiers.pollFirst(); 252 } 253 if (scan.getFilter() == null && qualifier == null) { 254 scan.setFilter(new FirstKeyOnlyFilter()); 255 } 256 scanner = env.getRegion().getScanner(scan); 257 boolean hasMoreRows = false; 258 do { 259 hasMoreRows = scanner.next(results); 260 if (results.size() > 0) { 261 counter++; 262 } 263 results.clear(); 264 } while (hasMoreRows); 265 ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter); 266 bb.rewind(); 267 response = AggregateResponse.newBuilder().addFirstPart( 268 ByteString.copyFrom(bb)).build(); 269 } catch (IOException e) { 270 CoprocessorRpcUtils.setControllerException(controller, e); 271 } finally { 272 if (scanner != null) { 273 try { 274 scanner.close(); 275 } catch (IOException ignored) {} 276 } 277 } 278 log.info("Row counter from this region is " 279 + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter); 280 done.run(response); 281 } 282 283 /** 284 * Gives a Pair with first object as Sum and second object as row count, 285 * computed for a given combination of column qualifier and column family in 286 * the given row range as defined in the Scan object. In its current 287 * implementation, it takes one column family and one column qualifier (if 288 * provided). In case of null column qualifier, an aggregate sum over all the 289 * entire column family will be returned. 290 * <p> 291 * The average is computed in 292 * AggregationClient#avg(byte[], ColumnInterpreter, Scan) by 293 * processing results from all regions, so its "ok" to pass sum and a Long 294 * type. 295 */ 296 @Override 297 public void getAvg(RpcController controller, AggregateRequest request, 298 RpcCallback<AggregateResponse> done) { 299 AggregateResponse response = null; 300 InternalScanner scanner = null; 301 try { 302 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); 303 S sumVal = null; 304 Long rowCountVal = 0L; 305 Scan scan = ProtobufUtil.toScan(request.getScan()); 306 scanner = env.getRegion().getScanner(scan); 307 byte[] colFamily = scan.getFamilies()[0]; 308 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); 309 byte[] qualifier = null; 310 if (qualifiers != null && !qualifiers.isEmpty()) { 311 qualifier = qualifiers.pollFirst(); 312 } 313 List<Cell> results = new ArrayList<>(); 314 boolean hasMoreRows = false; 315 316 do { 317 results.clear(); 318 hasMoreRows = scanner.next(results); 319 int listSize = results.size(); 320 for (int i = 0; i < listSize; i++) { 321 sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily, 322 qualifier, results.get(i)))); 323 } 324 rowCountVal++; 325 } while (hasMoreRows); 326 if (sumVal != null) { 327 ByteString first = ci.getProtoForPromotedType(sumVal).toByteString(); 328 AggregateResponse.Builder pair = AggregateResponse.newBuilder(); 329 pair.addFirstPart(first); 330 ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal); 331 bb.rewind(); 332 pair.setSecondPart(ByteString.copyFrom(bb)); 333 response = pair.build(); 334 } 335 } catch (IOException e) { 336 CoprocessorRpcUtils.setControllerException(controller, e); 337 } finally { 338 if (scanner != null) { 339 try { 340 scanner.close(); 341 } catch (IOException ignored) {} 342 } 343 } 344 done.run(response); 345 } 346 347 /** 348 * Gives a Pair with first object a List containing Sum and sum of squares, 349 * and the second object as row count. It is computed for a given combination of 350 * column qualifier and column family in the given row range as defined in the 351 * Scan object. In its current implementation, it takes one column family and 352 * one column qualifier (if provided). The idea is get the value of variance first: 353 * the average of the squares less the square of the average a standard 354 * deviation is square root of variance. 355 */ 356 @Override 357 public void getStd(RpcController controller, AggregateRequest request, 358 RpcCallback<AggregateResponse> done) { 359 InternalScanner scanner = null; 360 AggregateResponse response = null; 361 try { 362 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); 363 S sumVal = null, sumSqVal = null, tempVal = null; 364 long rowCountVal = 0L; 365 Scan scan = ProtobufUtil.toScan(request.getScan()); 366 scanner = env.getRegion().getScanner(scan); 367 byte[] colFamily = scan.getFamilies()[0]; 368 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); 369 byte[] qualifier = null; 370 if (qualifiers != null && !qualifiers.isEmpty()) { 371 qualifier = qualifiers.pollFirst(); 372 } 373 List<Cell> results = new ArrayList<>(); 374 375 boolean hasMoreRows = false; 376 377 do { 378 tempVal = null; 379 hasMoreRows = scanner.next(results); 380 int listSize = results.size(); 381 for (int i = 0; i < listSize; i++) { 382 tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily, 383 qualifier, results.get(i)))); 384 } 385 results.clear(); 386 sumVal = ci.add(sumVal, tempVal); 387 sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal)); 388 rowCountVal++; 389 } while (hasMoreRows); 390 if (sumVal != null) { 391 ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString(); 392 ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString(); 393 AggregateResponse.Builder pair = AggregateResponse.newBuilder(); 394 pair.addFirstPart(first_sumVal); 395 pair.addFirstPart(first_sumSqVal); 396 ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal); 397 bb.rewind(); 398 pair.setSecondPart(ByteString.copyFrom(bb)); 399 response = pair.build(); 400 } 401 } catch (IOException e) { 402 CoprocessorRpcUtils.setControllerException(controller, e); 403 } finally { 404 if (scanner != null) { 405 try { 406 scanner.close(); 407 } catch (IOException ignored) {} 408 } 409 } 410 done.run(response); 411 } 412 413 /** 414 * Gives a List containing sum of values and sum of weights. 415 * It is computed for the combination of column 416 * family and column qualifier(s) in the given row range as defined in the 417 * Scan object. In its current implementation, it takes one column family and 418 * two column qualifiers. The first qualifier is for values column and 419 * the second qualifier (optional) is for weight column. 420 */ 421 @Override 422 public void getMedian(RpcController controller, AggregateRequest request, 423 RpcCallback<AggregateResponse> done) { 424 AggregateResponse response = null; 425 InternalScanner scanner = null; 426 try { 427 ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); 428 S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null; 429 Scan scan = ProtobufUtil.toScan(request.getScan()); 430 scanner = env.getRegion().getScanner(scan); 431 byte[] colFamily = scan.getFamilies()[0]; 432 NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); 433 byte[] valQualifier = null, weightQualifier = null; 434 if (qualifiers != null && !qualifiers.isEmpty()) { 435 valQualifier = qualifiers.pollFirst(); 436 // if weighted median is requested, get qualifier for the weight column 437 weightQualifier = qualifiers.pollLast(); 438 } 439 List<Cell> results = new ArrayList<>(); 440 441 boolean hasMoreRows = false; 442 443 do { 444 tempVal = null; 445 tempWeight = null; 446 hasMoreRows = scanner.next(results); 447 int listSize = results.size(); 448 for (int i = 0; i < listSize; i++) { 449 Cell kv = results.get(i); 450 tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily, 451 valQualifier, kv))); 452 if (weightQualifier != null) { 453 tempWeight = ci.add(tempWeight, 454 ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv))); 455 } 456 } 457 results.clear(); 458 sumVal = ci.add(sumVal, tempVal); 459 sumWeights = ci.add(sumWeights, tempWeight); 460 } while (hasMoreRows); 461 ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString(); 462 S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights; 463 ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString(); 464 AggregateResponse.Builder pair = AggregateResponse.newBuilder(); 465 pair.addFirstPart(first_sumVal); 466 pair.addFirstPart(first_sumWeights); 467 response = pair.build(); 468 } catch (IOException e) { 469 CoprocessorRpcUtils.setControllerException(controller, e); 470 } finally { 471 if (scanner != null) { 472 try { 473 scanner.close(); 474 } catch (IOException ignored) {} 475 } 476 } 477 done.run(response); 478 } 479 480 @SuppressWarnings("unchecked") 481 // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO. 482 ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest( 483 AggregateRequest request) throws IOException { 484 String className = request.getInterpreterClassName(); 485 try { 486 ColumnInterpreter<T,S,P,Q,R> ci; 487 Class<?> cls = Class.forName(className); 488 ci = (ColumnInterpreter<T, S, P, Q, R>) cls.getDeclaredConstructor().newInstance(); 489 490 if (request.hasInterpreterSpecificBytes()) { 491 ByteString b = request.getInterpreterSpecificBytes(); 492 P initMsg = getParsedGenericInstance(ci.getClass(), 2, b); 493 ci.initialize(initMsg); 494 } 495 return ci; 496 } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | 497 NoSuchMethodException | InvocationTargetException e) { 498 throw new IOException(e); 499 } 500 } 501 502 @Override 503 public Iterable<Service> getServices() { 504 return Collections.singleton(this); 505 } 506 507 /** 508 * Stores a reference to the coprocessor environment provided by the 509 * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this 510 * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded 511 * on a table region, so always expects this to be an instance of 512 * {@link RegionCoprocessorEnvironment}. 513 * @param env the environment provided by the coprocessor host 514 * @throws IOException if the provided environment is not an instance of 515 * {@code RegionCoprocessorEnvironment} 516 */ 517 @Override 518 public void start(CoprocessorEnvironment env) throws IOException { 519 if (env instanceof RegionCoprocessorEnvironment) { 520 this.env = (RegionCoprocessorEnvironment)env; 521 } else { 522 throw new CoprocessorException("Must be loaded on a table region!"); 523 } 524 } 525 526 @Override 527 public void stop(CoprocessorEnvironment env) throws IOException { 528 // nothing to do 529 } 530}