001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.coprocessor; 019 020import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; 021import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB; 022 023import com.google.protobuf.ByteString; 024import com.google.protobuf.Message; 025import com.google.protobuf.RpcCallback; 026import com.google.protobuf.RpcController; 027 028import java.io.Closeable; 029import java.io.IOException; 030import java.nio.ByteBuffer; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.Map; 034import java.util.NavigableMap; 035import java.util.NavigableSet; 036import java.util.TreeMap; 037import java.util.concurrent.atomic.AtomicLong; 038 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.client.ConnectionFactory; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.ResultScanner; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; 050import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 051import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; 052import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; 053import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.Pair; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060/** 061 * This client class is for invoking the aggregate functions deployed on the 062 * Region Server side via the AggregateService. This class will implement the 063 * supporting functionality for summing/processing the individual results 064 * obtained from the AggregateService for each region. 065 * <p> 066 * This will serve as the client side handler for invoking the aggregate 067 * functions. 068 * For all aggregate functions, 069 * <ul> 070 * <li>start row < end row is an essential condition (if they are not 071 * {@link HConstants#EMPTY_BYTE_ARRAY}) 072 * <li>Column family can't be null. In case where multiple families are 073 * provided, an IOException will be thrown. An optional column qualifier can 074 * also be defined.</li> 075 * <li>For methods to find maximum, minimum, sum, rowcount, it returns the 076 * parameter type. For average and std, it returns a double value. For row 077 * count, it returns a long value.</li> 078 * </ul> 079 * <p>Call {@link #close()} when done. 080 */ 081@InterfaceAudience.Public 082public class AggregationClient implements Closeable { 083 // TODO: This class is not used. Move to examples? 084 private static final Logger log = LoggerFactory.getLogger(AggregationClient.class); 085 private final Connection connection; 086 087 /** 088 * An RpcController implementation for use here in this endpoint. 089 */ 090 static class AggregationClientRpcController implements RpcController { 091 private String errorText; 092 private boolean cancelled = false; 093 private boolean failed = false; 094 095 @Override 096 public String errorText() { 097 return this.errorText; 098 } 099 100 @Override 101 public boolean failed() { 102 return this.failed; 103 } 104 105 @Override 106 public boolean isCanceled() { 107 return this.cancelled; 108 } 109 110 @Override 111 public void notifyOnCancel(RpcCallback<Object> arg0) { 112 throw new UnsupportedOperationException(); 113 } 114 115 @Override 116 public void reset() { 117 this.errorText = null; 118 this.cancelled = false; 119 this.failed = false; 120 } 121 122 @Override 123 public void setFailed(String errorText) { 124 this.failed = true; 125 this.errorText = errorText; 126 } 127 128 @Override 129 public void startCancel() { 130 this.cancelled = true; 131 } 132 } 133 134 /** 135 * Constructor with Conf object 136 * @param cfg Configuration to use 137 */ 138 public AggregationClient(Configuration cfg) { 139 try { 140 // Create a connection on construction. Will use it making each of the calls below. 141 this.connection = ConnectionFactory.createConnection(cfg); 142 } catch (IOException e) { 143 throw new RuntimeException(e); 144 } 145 } 146 147 @Override 148 public void close() throws IOException { 149 if (this.connection != null && !this.connection.isClosed()) { 150 this.connection.close(); 151 } 152 } 153 154 /** 155 * It gives the maximum value of a column for a given column family for the 156 * given range. In case qualifier is null, a max of all values for the given 157 * family is returned. 158 * @param tableName the name of the table to scan 159 * @param ci the user's ColumnInterpreter implementation 160 * @param scan the HBase scan object to use to read data from HBase 161 * @return max val <R> 162 * @throws Throwable The caller is supposed to handle the exception as they are thrown 163 * & propagated to it. 164 */ 165 public <R, S, P extends Message, Q extends Message, T extends Message> R max( 166 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 167 throws Throwable { 168 try (Table table = connection.getTable(tableName)) { 169 return max(table, ci, scan); 170 } 171 } 172 173 /** 174 * It gives the maximum value of a column for a given column family for the 175 * given range. In case qualifier is null, a max of all values for the given 176 * family is returned. 177 * @param table table to scan. 178 * @param ci the user's ColumnInterpreter implementation 179 * @param scan the HBase scan object to use to read data from HBase 180 * @return max val <> 181 * @throws Throwable The caller is supposed to handle the exception as they are thrown 182 * & propagated to it. 183 */ 184 public <R, S, P extends Message, Q extends Message, T extends Message> 185 R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 186 throws Throwable { 187 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 188 class MaxCallBack implements Batch.Callback<R> { 189 R max = null; 190 191 R getMax() { 192 return max; 193 } 194 195 @Override 196 public synchronized void update(byte[] region, byte[] row, R result) { 197 max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max; 198 } 199 } 200 MaxCallBack aMaxCallBack = new MaxCallBack(); 201 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 202 new Batch.Call<AggregateService, R>() { 203 @Override 204 public R call(AggregateService instance) throws IOException { 205 RpcController controller = new AggregationClientRpcController(); 206 CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = 207 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 208 instance.getMax(controller, requestArg, rpcCallback); 209 AggregateResponse response = rpcCallback.get(); 210 if (controller.failed()) { 211 throw new IOException(controller.errorText()); 212 } 213 if (response.getFirstPartCount() > 0) { 214 ByteString b = response.getFirstPart(0); 215 Q q = getParsedGenericInstance(ci.getClass(), 3, b); 216 return ci.getCellValueFromProto(q); 217 } 218 return null; 219 } 220 }, aMaxCallBack); 221 return aMaxCallBack.getMax(); 222 } 223 224 /** 225 * It gives the minimum value of a column for a given column family for the 226 * given range. In case qualifier is null, a min of all values for the given 227 * family is returned. 228 * @param tableName the name of the table to scan 229 * @param ci the user's ColumnInterpreter implementation 230 * @param scan the HBase scan object to use to read data from HBase 231 * @return min val <R> 232 * @throws Throwable The caller is supposed to handle the exception as they are thrown 233 * & propagated to it. 234 */ 235 public <R, S, P extends Message, Q extends Message, T extends Message> R min( 236 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 237 throws Throwable { 238 try (Table table = connection.getTable(tableName)) { 239 return min(table, ci, scan); 240 } 241 } 242 243 /** 244 * It gives the minimum value of a column for a given column family for the 245 * given range. In case qualifier is null, a min of all values for the given 246 * family is returned. 247 * @param table table to scan. 248 * @param ci the user's ColumnInterpreter implementation 249 * @param scan the HBase scan object to use to read data from HBase 250 * @return min val <R> 251 * @throws Throwable The caller is supposed to handle the exception as they are thrown 252 * & propagated to it. 253 */ 254 public <R, S, P extends Message, Q extends Message, T extends Message> 255 R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 256 throws Throwable { 257 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 258 class MinCallBack implements Batch.Callback<R> { 259 private R min = null; 260 261 public R getMinimum() { 262 return min; 263 } 264 265 @Override 266 public synchronized void update(byte[] region, byte[] row, R result) { 267 min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min; 268 } 269 } 270 271 MinCallBack minCallBack = new MinCallBack(); 272 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 273 new Batch.Call<AggregateService, R>() { 274 @Override 275 public R call(AggregateService instance) throws IOException { 276 RpcController controller = new AggregationClientRpcController(); 277 CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = 278 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 279 instance.getMin(controller, requestArg, rpcCallback); 280 AggregateResponse response = rpcCallback.get(); 281 if (controller.failed()) { 282 throw new IOException(controller.errorText()); 283 } 284 if (response.getFirstPartCount() > 0) { 285 ByteString b = response.getFirstPart(0); 286 Q q = getParsedGenericInstance(ci.getClass(), 3, b); 287 return ci.getCellValueFromProto(q); 288 } 289 return null; 290 } 291 }, minCallBack); 292 log.debug("Min fom all regions is: " + minCallBack.getMinimum()); 293 return minCallBack.getMinimum(); 294 } 295 296 /** 297 * It gives the row count, by summing up the individual results obtained from 298 * regions. In case the qualifier is null, FirstKeyValueFilter is used to 299 * optimised the operation. In case qualifier is provided, I can't use the 300 * filter as it may set the flag to skip to next row, but the value read is 301 * not of the given filter: in this case, this particular row will not be 302 * counted ==> an error. 303 * @param tableName the name of the table to scan 304 * @param ci the user's ColumnInterpreter implementation 305 * @param scan the HBase scan object to use to read data from HBase 306 * @return <R, S> 307 * @throws Throwable The caller is supposed to handle the exception as they are thrown 308 * & propagated to it. 309 */ 310 public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount( 311 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 312 throws Throwable { 313 try (Table table = connection.getTable(tableName)) { 314 return rowCount(table, ci, scan); 315 } 316 } 317 318 /** 319 * It gives the row count, by summing up the individual results obtained from 320 * regions. In case the qualifier is null, FirstKeyValueFilter is used to 321 * optimised the operation. In case qualifier is provided, I can't use the 322 * filter as it may set the flag to skip to next row, but the value read is 323 * not of the given filter: in this case, this particular row will not be 324 * counted ==> an error. 325 * @param table table to scan. 326 * @param ci the user's ColumnInterpreter implementation 327 * @param scan the HBase scan object to use to read data from HBase 328 * @return <R, S> 329 * @throws Throwable The caller is supposed to handle the exception as they are thrown 330 * & propagated to it. 331 */ 332 public <R, S, P extends Message, Q extends Message, T extends Message> 333 long rowCount(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 334 throws Throwable { 335 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true); 336 class RowNumCallback implements Batch.Callback<Long> { 337 private final AtomicLong rowCountL = new AtomicLong(0); 338 339 public long getRowNumCount() { 340 return rowCountL.get(); 341 } 342 343 @Override 344 public void update(byte[] region, byte[] row, Long result) { 345 rowCountL.addAndGet(result.longValue()); 346 } 347 } 348 349 RowNumCallback rowNum = new RowNumCallback(); 350 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 351 new Batch.Call<AggregateService, Long>() { 352 @Override 353 public Long call(AggregateService instance) throws IOException { 354 RpcController controller = new AggregationClientRpcController(); 355 CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = 356 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 357 instance.getRowNum(controller, requestArg, rpcCallback); 358 AggregateResponse response = rpcCallback.get(); 359 if (controller.failed()) { 360 throw new IOException(controller.errorText()); 361 } 362 byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); 363 ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); 364 bb.rewind(); 365 return bb.getLong(); 366 } 367 }, rowNum); 368 return rowNum.getRowNumCount(); 369 } 370 371 /** 372 * It sums up the value returned from various regions. In case qualifier is 373 * null, summation of all the column qualifiers in the given family is done. 374 * @param tableName the name of the table to scan 375 * @param ci the user's ColumnInterpreter implementation 376 * @param scan the HBase scan object to use to read data from HBase 377 * @return sum <S> 378 * @throws Throwable The caller is supposed to handle the exception as they are thrown 379 * & propagated to it. 380 */ 381 public <R, S, P extends Message, Q extends Message, T extends Message> S sum( 382 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 383 throws Throwable { 384 try (Table table = connection.getTable(tableName)) { 385 return sum(table, ci, scan); 386 } 387 } 388 389 /** 390 * It sums up the value returned from various regions. In case qualifier is 391 * null, summation of all the column qualifiers in the given family is done. 392 * @param table table to scan. 393 * @param ci the user's ColumnInterpreter implementation 394 * @param scan the HBase scan object to use to read data from HBase 395 * @return sum <S> 396 * @throws Throwable The caller is supposed to handle the exception as they are thrown 397 * & propagated to it. 398 */ 399 public <R, S, P extends Message, Q extends Message, T extends Message> 400 S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 401 throws Throwable { 402 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 403 404 class SumCallBack implements Batch.Callback<S> { 405 S sumVal = null; 406 407 public S getSumResult() { 408 return sumVal; 409 } 410 411 @Override 412 public synchronized void update(byte[] region, byte[] row, S result) { 413 sumVal = ci.add(sumVal, result); 414 } 415 } 416 SumCallBack sumCallBack = new SumCallBack(); 417 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 418 new Batch.Call<AggregateService, S>() { 419 @Override 420 public S call(AggregateService instance) throws IOException { 421 RpcController controller = new AggregationClientRpcController(); 422 // Not sure what is going on here why I have to do these casts. TODO. 423 CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = 424 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 425 instance.getSum(controller, requestArg, rpcCallback); 426 AggregateResponse response = rpcCallback.get(); 427 if (controller.failed()) { 428 throw new IOException(controller.errorText()); 429 } 430 if (response.getFirstPartCount() == 0) { 431 return null; 432 } 433 ByteString b = response.getFirstPart(0); 434 T t = getParsedGenericInstance(ci.getClass(), 4, b); 435 S s = ci.getPromotedValueFromProto(t); 436 return s; 437 } 438 }, sumCallBack); 439 return sumCallBack.getSumResult(); 440 } 441 442 /** 443 * It computes average while fetching sum and row count from all the 444 * corresponding regions. Approach is to compute a global sum of region level 445 * sum and rowcount and then compute the average. 446 * @param tableName the name of the table to scan 447 * @param scan the HBase scan object to use to read data from HBase 448 * @throws Throwable The caller is supposed to handle the exception as they are thrown 449 * & propagated to it. 450 */ 451 private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs( 452 final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) 453 throws Throwable { 454 try (Table table = connection.getTable(tableName)) { 455 return getAvgArgs(table, ci, scan); 456 } 457 } 458 459 /** 460 * It computes average while fetching sum and row count from all the 461 * corresponding regions. Approach is to compute a global sum of region level 462 * sum and rowcount and then compute the average. 463 * @param table table to scan. 464 * @param scan the HBase scan object to use to read data from HBase 465 * @throws Throwable The caller is supposed to handle the exception as they are thrown 466 * & propagated to it. 467 */ 468 private <R, S, P extends Message, Q extends Message, T extends Message> 469 Pair<S, Long> getAvgArgs(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, 470 final Scan scan) throws Throwable { 471 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 472 class AvgCallBack implements Batch.Callback<Pair<S, Long>> { 473 S sum = null; 474 Long rowCount = 0L; 475 476 public synchronized Pair<S, Long> getAvgArgs() { 477 return new Pair<>(sum, rowCount); 478 } 479 480 @Override 481 public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) { 482 sum = ci.add(sum, result.getFirst()); 483 rowCount += result.getSecond(); 484 } 485 } 486 487 AvgCallBack avgCallBack = new AvgCallBack(); 488 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 489 new Batch.Call<AggregateService, Pair<S, Long>>() { 490 @Override 491 public Pair<S, Long> call(AggregateService instance) throws IOException { 492 RpcController controller = new AggregationClientRpcController(); 493 CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = 494 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 495 instance.getAvg(controller, requestArg, rpcCallback); 496 AggregateResponse response = rpcCallback.get(); 497 if (controller.failed()) { 498 throw new IOException(controller.errorText()); 499 } 500 Pair<S, Long> pair = new Pair<>(null, 0L); 501 if (response.getFirstPartCount() == 0) { 502 return pair; 503 } 504 ByteString b = response.getFirstPart(0); 505 T t = getParsedGenericInstance(ci.getClass(), 4, b); 506 S s = ci.getPromotedValueFromProto(t); 507 pair.setFirst(s); 508 ByteBuffer bb = ByteBuffer.allocate(8).put( 509 getBytesFromResponse(response.getSecondPart())); 510 bb.rewind(); 511 pair.setSecond(bb.getLong()); 512 return pair; 513 } 514 }, avgCallBack); 515 return avgCallBack.getAvgArgs(); 516 } 517 518 /** 519 * This is the client side interface/handle for calling the average method for 520 * a given cf-cq combination. It was necessary to add one more call stack as 521 * its return type should be a decimal value, irrespective of what 522 * columninterpreter says. So, this methods collects the necessary parameters 523 * to compute the average and returs the double value. 524 * @param tableName the name of the table to scan 525 * @param ci the user's ColumnInterpreter implementation 526 * @param scan the HBase scan object to use to read data from HBase 527 * @return <R, S> 528 * @throws Throwable The caller is supposed to handle the exception as they are thrown 529 * & propagated to it. 530 */ 531 public <R, S, P extends Message, Q extends Message, T extends Message> 532 double avg(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, 533 Scan scan) throws Throwable { 534 Pair<S, Long> p = getAvgArgs(tableName, ci, scan); 535 return ci.divideForAvg(p.getFirst(), p.getSecond()); 536 } 537 538 /** 539 * This is the client side interface/handle for calling the average method for 540 * a given cf-cq combination. It was necessary to add one more call stack as 541 * its return type should be a decimal value, irrespective of what 542 * columninterpreter says. So, this methods collects the necessary parameters 543 * to compute the average and returs the double value. 544 * @param table table to scan. 545 * @param ci the user's ColumnInterpreter implementation 546 * @param scan the HBase scan object to use to read data from HBase 547 * @return <R, S> 548 * @throws Throwable The caller is supposed to handle the exception as they are thrown 549 * & propagated to it. 550 */ 551 public <R, S, P extends Message, Q extends Message, T extends Message> double avg( 552 final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) 553 throws Throwable { 554 Pair<S, Long> p = getAvgArgs(table, ci, scan); 555 return ci.divideForAvg(p.getFirst(), p.getSecond()); 556 } 557 558 /** 559 * It computes a global standard deviation for a given column and its value. 560 * Standard deviation is square root of (average of squares - 561 * average*average). From individual regions, it obtains sum, square sum and 562 * number of rows. With these, the above values are computed to get the global 563 * std. 564 * @param table table to scan. 565 * @param scan the HBase scan object to use to read data from HBase 566 * @return standard deviations 567 * @throws Throwable The caller is supposed to handle the exception as they are thrown 568 * & propagated to it. 569 */ 570 private <R, S, P extends Message, Q extends Message, T extends Message> 571 Pair<List<S>, Long> getStdArgs(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, 572 final Scan scan) throws Throwable { 573 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 574 class StdCallback implements Batch.Callback<Pair<List<S>, Long>> { 575 long rowCountVal = 0L; 576 S sumVal = null, sumSqVal = null; 577 578 public synchronized Pair<List<S>, Long> getStdParams() { 579 List<S> l = new ArrayList<>(2); 580 l.add(sumVal); 581 l.add(sumSqVal); 582 Pair<List<S>, Long> p = new Pair<>(l, rowCountVal); 583 return p; 584 } 585 586 @Override 587 public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) { 588 if (result.getFirst().size() > 0) { 589 sumVal = ci.add(sumVal, result.getFirst().get(0)); 590 sumSqVal = ci.add(sumSqVal, result.getFirst().get(1)); 591 rowCountVal += result.getSecond(); 592 } 593 } 594 } 595 596 StdCallback stdCallback = new StdCallback(); 597 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 598 new Batch.Call<AggregateService, Pair<List<S>, Long>>() { 599 @Override 600 public Pair<List<S>, Long> call(AggregateService instance) throws IOException { 601 RpcController controller = new AggregationClientRpcController(); 602 CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = 603 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 604 instance.getStd(controller, requestArg, rpcCallback); 605 AggregateResponse response = rpcCallback.get(); 606 if (controller.failed()) { 607 throw new IOException(controller.errorText()); 608 } 609 Pair<List<S>, Long> pair = new Pair<>(new ArrayList<>(), 0L); 610 if (response.getFirstPartCount() == 0) { 611 return pair; 612 } 613 List<S> list = new ArrayList<>(); 614 for (int i = 0; i < response.getFirstPartCount(); i++) { 615 ByteString b = response.getFirstPart(i); 616 T t = getParsedGenericInstance(ci.getClass(), 4, b); 617 S s = ci.getPromotedValueFromProto(t); 618 list.add(s); 619 } 620 pair.setFirst(list); 621 ByteBuffer bb = ByteBuffer.allocate(8).put( 622 getBytesFromResponse(response.getSecondPart())); 623 bb.rewind(); 624 pair.setSecond(bb.getLong()); 625 return pair; 626 } 627 }, stdCallback); 628 return stdCallback.getStdParams(); 629 } 630 631 /** 632 * This is the client side interface/handle for calling the std method for a 633 * given cf-cq combination. It was necessary to add one more call stack as its 634 * return type should be a decimal value, irrespective of what 635 * columninterpreter says. So, this methods collects the necessary parameters 636 * to compute the std and returns the double value. 637 * @param tableName the name of the table to scan 638 * @param ci the user's ColumnInterpreter implementation 639 * @param scan the HBase scan object to use to read data from HBase 640 * @return <R, S> 641 * @throws Throwable The caller is supposed to handle the exception as they are thrown 642 * & propagated to it. 643 */ 644 public <R, S, P extends Message, Q extends Message, T extends Message> 645 double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, 646 Scan scan) throws Throwable { 647 try (Table table = connection.getTable(tableName)) { 648 return std(table, ci, scan); 649 } 650 } 651 652 /** 653 * This is the client side interface/handle for calling the std method for a 654 * given cf-cq combination. It was necessary to add one more call stack as its 655 * return type should be a decimal value, irrespective of what 656 * columninterpreter says. So, this methods collects the necessary parameters 657 * to compute the std and returns the double value. 658 * @param table table to scan. 659 * @param ci the user's ColumnInterpreter implementation 660 * @param scan the HBase scan object to use to read data from HBase 661 * @return <R, S> 662 * @throws Throwable The caller is supposed to handle the exception as they are thrown 663 * & propagated to it. 664 */ 665 public <R, S, P extends Message, Q extends Message, T extends Message> double std( 666 final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { 667 Pair<List<S>, Long> p = getStdArgs(table, ci, scan); 668 double res = 0d; 669 double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond()); 670 double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond()); 671 res = avgOfSumSq - (avg) * (avg); // variance 672 res = Math.pow(res, 0.5); 673 return res; 674 } 675 676 /** 677 * It helps locate the region with median for a given column whose weight 678 * is specified in an optional column. 679 * From individual regions, it obtains sum of values and sum of weights. 680 * @param table table to scan. 681 * @param ci the user's ColumnInterpreter implementation 682 * @param scan the HBase scan object to use to read data from HBase 683 * @return pair whose first element is a map between start row of the region 684 * and (sum of values, sum of weights) for the region, the second element is 685 * (sum of values, sum of weights) for all the regions chosen 686 * @throws Throwable The caller is supposed to handle the exception as they are thrown 687 * & propagated to it. 688 */ 689 private <R, S, P extends Message, Q extends Message, T extends Message> 690 Pair<NavigableMap<byte[], List<S>>, List<S>> 691 getMedianArgs(final Table table, 692 final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { 693 final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); 694 final NavigableMap<byte[], List<S>> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 695 class StdCallback implements Batch.Callback<List<S>> { 696 S sumVal = null, sumWeights = null; 697 698 public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() { 699 List<S> l = new ArrayList<>(2); 700 l.add(sumVal); 701 l.add(sumWeights); 702 Pair<NavigableMap<byte[], List<S>>, List<S>> p = new Pair<>(map, l); 703 return p; 704 } 705 706 @Override 707 public synchronized void update(byte[] region, byte[] row, List<S> result) { 708 map.put(row, result); 709 sumVal = ci.add(sumVal, result.get(0)); 710 sumWeights = ci.add(sumWeights, result.get(1)); 711 } 712 } 713 StdCallback stdCallback = new StdCallback(); 714 table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), 715 new Batch.Call<AggregateService, List<S>>() { 716 @Override 717 public List<S> call(AggregateService instance) throws IOException { 718 RpcController controller = new AggregationClientRpcController(); 719 CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = 720 new CoprocessorRpcUtils.BlockingRpcCallback<>(); 721 instance.getMedian(controller, requestArg, rpcCallback); 722 AggregateResponse response = rpcCallback.get(); 723 if (controller.failed()) { 724 throw new IOException(controller.errorText()); 725 } 726 727 List<S> list = new ArrayList<>(); 728 for (int i = 0; i < response.getFirstPartCount(); i++) { 729 ByteString b = response.getFirstPart(i); 730 T t = getParsedGenericInstance(ci.getClass(), 4, b); 731 S s = ci.getPromotedValueFromProto(t); 732 list.add(s); 733 } 734 return list; 735 } 736 737 }, stdCallback); 738 return stdCallback.getMedianParams(); 739 } 740 741 /** 742 * This is the client side interface/handler for calling the median method for a 743 * given cf-cq combination. This method collects the necessary parameters 744 * to compute the median and returns the median. 745 * @param tableName the name of the table to scan 746 * @param ci the user's ColumnInterpreter implementation 747 * @param scan the HBase scan object to use to read data from HBase 748 * @return R the median 749 * @throws Throwable The caller is supposed to handle the exception as they are thrown 750 * & propagated to it. 751 */ 752 public <R, S, P extends Message, Q extends Message, T extends Message> 753 R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, 754 Scan scan) throws Throwable { 755 try (Table table = connection.getTable(tableName)) { 756 return median(table, ci, scan); 757 } 758 } 759 760 /** 761 * This is the client side interface/handler for calling the median method for a 762 * given cf-cq combination. This method collects the necessary parameters 763 * to compute the median and returns the median. 764 * @param table table to scan. 765 * @param ci the user's ColumnInterpreter implementation 766 * @param scan the HBase scan object to use to read data from HBase 767 * @return R the median 768 * @throws Throwable The caller is supposed to handle the exception as they are thrown 769 * & propagated to it. 770 */ 771 public <R, S, P extends Message, Q extends Message, T extends Message> 772 R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { 773 Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan); 774 byte[] startRow = null; 775 byte[] colFamily = scan.getFamilies()[0]; 776 NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily); 777 NavigableMap<byte[], List<S>> map = p.getFirst(); 778 S sumVal = p.getSecond().get(0); 779 S sumWeights = p.getSecond().get(1); 780 double halfSumVal = ci.divideForAvg(sumVal, 2L); 781 double movingSumVal = 0; 782 boolean weighted = false; 783 if (quals.size() > 1) { 784 weighted = true; 785 halfSumVal = ci.divideForAvg(sumWeights, 2L); 786 } 787 788 for (Map.Entry<byte[], List<S>> entry : map.entrySet()) { 789 S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0); 790 double newSumVal = movingSumVal + ci.divideForAvg(s, 1L); 791 if (newSumVal > halfSumVal) { 792 // we found the region with the median 793 break; 794 } 795 movingSumVal = newSumVal; 796 startRow = entry.getKey(); 797 } 798 // scan the region with median and find it 799 Scan scan2 = new Scan(scan); 800 // inherit stop row from method parameter 801 if (startRow != null) { 802 scan2.setStartRow(startRow); 803 } 804 ResultScanner scanner = null; 805 try { 806 int cacheSize = scan2.getCaching(); 807 if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) { 808 scan2.setCacheBlocks(true); 809 cacheSize = 5; 810 scan2.setCaching(cacheSize); 811 } 812 scanner = table.getScanner(scan2); 813 Result[] results = null; 814 byte[] qualifier = quals.pollFirst(); 815 // qualifier for the weight column 816 byte[] weightQualifier = weighted ? quals.pollLast() : qualifier; 817 R value = null; 818 do { 819 results = scanner.next(cacheSize); 820 if (results != null && results.length > 0) { 821 for (int i = 0; i < results.length; i++) { 822 Result r = results[i]; 823 // retrieve weight 824 Cell kv = r.getColumnLatestCell(colFamily, weightQualifier); 825 R newValue = ci.getValue(colFamily, weightQualifier, kv); 826 S s = ci.castToReturnType(newValue); 827 double newSumVal = movingSumVal + ci.divideForAvg(s, 1L); 828 // see if we have moved past the median 829 if (newSumVal > halfSumVal) { 830 return value; 831 } 832 movingSumVal = newSumVal; 833 kv = r.getColumnLatestCell(colFamily, qualifier); 834 value = ci.getValue(colFamily, qualifier, kv); 835 } 836 } 837 } while (results != null && results.length > 0); 838 } finally { 839 if (scanner != null) { 840 scanner.close(); 841 } 842 } 843 return null; 844 } 845 846 byte[] getBytesFromResponse(ByteString response) { 847 ByteBuffer bb = response.asReadOnlyByteBuffer(); 848 bb.rewind(); 849 byte[] bytes; 850 if (bb.hasArray()) { 851 bytes = bb.array(); 852 } else { 853 bytes = response.toByteArray(); 854 } 855 return bytes; 856 } 857}