001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import com.google.protobuf.Message; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Set; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.IsolationLevel; 046import org.apache.hadoop.hbase.client.Mutation; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient; 051import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos; 052import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest; 053import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse; 054import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest; 055import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse; 056import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest; 057import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse; 058import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest; 059import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse; 060import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 061import org.apache.hadoop.hbase.ipc.RpcScheduler; 062import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest; 063import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse; 064import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService; 065import org.apache.hadoop.hbase.regionserver.BaseRowProcessor; 066import org.apache.hadoop.hbase.regionserver.HRegion; 067import org.apache.hadoop.hbase.regionserver.InternalScanner; 068import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 069import org.apache.hadoop.hbase.testclassification.MediumTests; 070import org.apache.hadoop.hbase.util.ByteStringer; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.wal.WALEdit; 073import org.junit.AfterClass; 074import org.junit.BeforeClass; 075import org.junit.ClassRule; 076import org.junit.Test; 077import org.junit.experimental.categories.Category; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081/** 082 * Verifies ProcessEndpoint works. 083 * The tested RowProcessor performs two scans and a read-modify-write. 084 */ 085@Category({CoprocessorTests.class, MediumTests.class}) 086public class TestRowProcessorEndpoint { 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestRowProcessorEndpoint.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestRowProcessorEndpoint.class); 092 093 private static final TableName TABLE = TableName.valueOf("testtable"); 094 private final static byte[] ROW = Bytes.toBytes("testrow"); 095 private final static byte[] ROW2 = Bytes.toBytes("testrow2"); 096 private final static byte[] FAM = Bytes.toBytes("friendlist"); 097 098 // Column names 099 private final static byte[] A = Bytes.toBytes("a"); 100 private final static byte[] B = Bytes.toBytes("b"); 101 private final static byte[] C = Bytes.toBytes("c"); 102 private final static byte[] D = Bytes.toBytes("d"); 103 private final static byte[] E = Bytes.toBytes("e"); 104 private final static byte[] F = Bytes.toBytes("f"); 105 private final static byte[] G = Bytes.toBytes("g"); 106 private final static byte[] COUNTER = Bytes.toBytes("counter"); 107 private final static AtomicLong myTimer = new AtomicLong(0); 108 private final AtomicInteger failures = new AtomicInteger(0); 109 110 private static HBaseTestingUtility util = new HBaseTestingUtility(); 111 private static volatile int expectedCounter = 0; 112 private static int rowSize, row2Size; 113 114 private volatile static Table table = null; 115 private volatile static boolean swapped = false; 116 private volatile CountDownLatch startSignal; 117 private volatile CountDownLatch doneSignal; 118 119 @BeforeClass 120 public static void setupBeforeClass() throws Exception { 121 Configuration conf = util.getConfiguration(); 122 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 123 RowProcessorEndpoint.class.getName()); 124 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 125 conf.setLong("hbase.hregion.row.processor.timeout", 1000L); 126 conf.setLong(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 2048); 127 util.startMiniCluster(); 128 } 129 130 @AfterClass 131 public static void tearDownAfterClass() throws Exception { 132 util.shutdownMiniCluster(); 133 } 134 135 public void prepareTestData() throws Exception { 136 try { 137 util.getAdmin().disableTable(TABLE); 138 util.getAdmin().deleteTable(TABLE); 139 } catch (Exception e) { 140 // ignore table not found 141 } 142 table = util.createTable(TABLE, FAM); 143 { 144 Put put = new Put(ROW); 145 put.addColumn(FAM, A, Bytes.add(B, C)); // B, C are friends of A 146 put.addColumn(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B 147 put.addColumn(FAM, C, G); // G is a friend of C 148 table.put(put); 149 rowSize = put.size(); 150 } 151 Put put = new Put(ROW2); 152 put.addColumn(FAM, D, E); 153 put.addColumn(FAM, F, G); 154 table.put(put); 155 row2Size = put.size(); 156 } 157 158 @Test 159 public void testDoubleScan() throws Throwable { 160 prepareTestData(); 161 162 CoprocessorRpcChannel channel = table.coprocessorService(ROW); 163 RowProcessorEndpoint.FriendsOfFriendsProcessor processor = 164 new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A); 165 RowProcessorService.BlockingInterface service = 166 RowProcessorService.newBlockingStub(channel); 167 ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor); 168 ProcessResponse protoResult = service.process(null, request); 169 FriendsOfFriendsProcessorResponse response = 170 FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult()); 171 Set<String> result = new HashSet<>(); 172 result.addAll(response.getResultList()); 173 Set<String> expected = new HashSet<>(Arrays.asList(new String[]{"d", "e", "f", "g"})); 174 Get get = new Get(ROW); 175 LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells())); 176 assertEquals(expected, result); 177 } 178 179 @Test 180 public void testReadModifyWrite() throws Throwable { 181 prepareTestData(); 182 failures.set(0); 183 int numThreads = 100; 184 concurrentExec(new IncrementRunner(), numThreads); 185 Get get = new Get(ROW); 186 LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells())); 187 int finalCounter = incrementCounter(table); 188 int failureNumber = failures.get(); 189 if (failureNumber > 0) { 190 LOG.debug("We failed " + failureNumber + " times during test"); 191 } 192 assertEquals(numThreads + 1 - failureNumber, finalCounter); 193 } 194 195 class IncrementRunner implements Runnable { 196 @Override 197 public void run() { 198 try { 199 incrementCounter(table); 200 } catch (Throwable e) { 201 failures.incrementAndGet(); 202 e.printStackTrace(); 203 } 204 } 205 } 206 207 private int incrementCounter(Table table) throws Throwable { 208 CoprocessorRpcChannel channel = table.coprocessorService(ROW); 209 RowProcessorEndpoint.IncrementCounterProcessor processor = 210 new RowProcessorEndpoint.IncrementCounterProcessor(ROW); 211 RowProcessorService.BlockingInterface service = 212 RowProcessorService.newBlockingStub(channel); 213 ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor); 214 ProcessResponse protoResult = service.process(null, request); 215 IncCounterProcessorResponse response = IncCounterProcessorResponse 216 .parseFrom(protoResult.getRowProcessorResult()); 217 Integer result = response.getResponse(); 218 return result; 219 } 220 221 private void concurrentExec(final Runnable task, final int numThreads) throws Throwable { 222 startSignal = new CountDownLatch(numThreads); 223 doneSignal = new CountDownLatch(numThreads); 224 for (int i = 0; i < numThreads; ++i) { 225 new Thread(new Runnable() { 226 @Override 227 public void run() { 228 try { 229 startSignal.countDown(); 230 startSignal.await(); 231 task.run(); 232 } catch (Throwable e) { 233 failures.incrementAndGet(); 234 e.printStackTrace(); 235 } 236 doneSignal.countDown(); 237 } 238 }).start(); 239 } 240 doneSignal.await(); 241 } 242 243 @Test 244 public void testMultipleRows() throws Throwable { 245 prepareTestData(); 246 failures.set(0); 247 int numThreads = 100; 248 concurrentExec(new SwapRowsRunner(), numThreads); 249 LOG.debug("row keyvalues:" + 250 stringifyKvs(table.get(new Get(ROW)).listCells())); 251 LOG.debug("row2 keyvalues:" + 252 stringifyKvs(table.get(new Get(ROW2)).listCells())); 253 int failureNumber = failures.get(); 254 if (failureNumber > 0) { 255 LOG.debug("We failed " + failureNumber + " times during test"); 256 } 257 if (!swapped) { 258 assertEquals(rowSize, table.get(new Get(ROW)).listCells().size()); 259 assertEquals(row2Size, table.get(new Get(ROW2)).listCells().size()); 260 } else { 261 assertEquals(rowSize, table.get(new Get(ROW2)).listCells().size()); 262 assertEquals(row2Size, table.get(new Get(ROW)).listCells().size()); 263 } 264 } 265 266 class SwapRowsRunner implements Runnable { 267 @Override 268 public void run() { 269 try { 270 swapRows(table); 271 } catch (Throwable e) { 272 failures.incrementAndGet(); 273 e.printStackTrace(); 274 } 275 } 276 } 277 278 private void swapRows(Table table) throws Throwable { 279 CoprocessorRpcChannel channel = table.coprocessorService(ROW); 280 RowProcessorEndpoint.RowSwapProcessor processor = 281 new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2); 282 RowProcessorService.BlockingInterface service = 283 RowProcessorService.newBlockingStub(channel); 284 ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor); 285 service.process(null, request); 286 } 287 288 @Test 289 public void testTimeout() throws Throwable { 290 prepareTestData(); 291 CoprocessorRpcChannel channel = table.coprocessorService(ROW); 292 RowProcessorEndpoint.TimeoutProcessor processor = 293 new RowProcessorEndpoint.TimeoutProcessor(ROW); 294 RowProcessorService.BlockingInterface service = 295 RowProcessorService.newBlockingStub(channel); 296 ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor); 297 boolean exceptionCaught = false; 298 try { 299 service.process(null, request); 300 } catch (Exception e) { 301 exceptionCaught = true; 302 } 303 assertTrue(exceptionCaught); 304 } 305 306 /** 307 * This class defines two RowProcessors: 308 * IncrementCounterProcessor and FriendsOfFriendsProcessor. 309 * 310 * We define the RowProcessors as the inner class of the endpoint. 311 * So they can be loaded with the endpoint on the coprocessor. 312 */ 313 public static class RowProcessorEndpoint<S extends Message,T extends Message> 314 extends BaseRowProcessorEndpoint<S,T> { 315 public static class IncrementCounterProcessor extends 316 BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest, 317 IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> { 318 int counter = 0; 319 byte[] row = new byte[0]; 320 321 /** 322 * Empty constructor for Writable 323 */ 324 IncrementCounterProcessor() { 325 } 326 327 IncrementCounterProcessor(byte[] row) { 328 this.row = row; 329 } 330 331 @Override 332 public Collection<byte[]> getRowsToLock() { 333 return Collections.singleton(row); 334 } 335 336 @Override 337 public IncCounterProcessorResponse getResult() { 338 IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder(); 339 i.setResponse(counter); 340 return i.build(); 341 } 342 343 @Override 344 public boolean readOnly() { 345 return false; 346 } 347 348 @Override 349 public void process(long now, HRegion region, 350 List<Mutation> mutations, WALEdit walEdit) throws IOException { 351 // Scan current counter 352 List<Cell> kvs = new ArrayList<>(); 353 Scan scan = new Scan(row, row); 354 scan.addColumn(FAM, COUNTER); 355 doScan(region, scan, kvs); 356 counter = kvs.isEmpty() ? 0 : 357 Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next())); 358 359 // Assert counter value 360 assertEquals(expectedCounter, counter); 361 362 // Increment counter and send it to both memstore and wal edit 363 counter += 1; 364 expectedCounter += 1; 365 366 367 Put p = new Put(row); 368 KeyValue kv = 369 new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter)); 370 p.add(kv); 371 mutations.add(p); 372 walEdit.add(kv); 373 374 // We can also inject some meta data to the walEdit 375 KeyValue metaKv = new KeyValue( 376 row, WALEdit.METAFAMILY, 377 Bytes.toBytes("I just increment counter"), 378 Bytes.toBytes(counter)); 379 walEdit.add(metaKv); 380 } 381 382 @Override 383 public IncCounterProcessorRequest getRequestData() throws IOException { 384 IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder(); 385 builder.setCounter(counter); 386 builder.setRow(ByteStringer.wrap(row)); 387 return builder.build(); 388 } 389 390 @Override 391 public void initialize(IncCounterProcessorRequest msg) { 392 this.row = msg.getRow().toByteArray(); 393 this.counter = msg.getCounter(); 394 } 395 } 396 397 public static class FriendsOfFriendsProcessor extends 398 BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> { 399 byte[] row = null; 400 byte[] person = null; 401 final Set<String> result = new HashSet<>(); 402 403 /** 404 * Empty constructor for Writable 405 */ 406 FriendsOfFriendsProcessor() { 407 } 408 409 FriendsOfFriendsProcessor(byte[] row, byte[] person) { 410 this.row = row; 411 this.person = person; 412 } 413 414 @Override 415 public Collection<byte[]> getRowsToLock() { 416 return Collections.singleton(row); 417 } 418 419 @Override 420 public FriendsOfFriendsProcessorResponse getResult() { 421 FriendsOfFriendsProcessorResponse.Builder builder = 422 FriendsOfFriendsProcessorResponse.newBuilder(); 423 builder.addAllResult(result); 424 return builder.build(); 425 } 426 427 @Override 428 public boolean readOnly() { 429 return true; 430 } 431 432 @Override 433 public void process(long now, HRegion region, 434 List<Mutation> mutations, WALEdit walEdit) throws IOException { 435 List<Cell> kvs = new ArrayList<>(); 436 { // First scan to get friends of the person 437 Scan scan = new Scan(row, row); 438 scan.addColumn(FAM, person); 439 doScan(region, scan, kvs); 440 } 441 442 // Second scan to get friends of friends 443 Scan scan = new Scan(row, row); 444 for (Cell kv : kvs) { 445 byte[] friends = CellUtil.cloneValue(kv); 446 for (byte f : friends) { 447 scan.addColumn(FAM, new byte[]{f}); 448 } 449 } 450 doScan(region, scan, kvs); 451 452 // Collect result 453 result.clear(); 454 for (Cell kv : kvs) { 455 for (byte b : CellUtil.cloneValue(kv)) { 456 result.add((char)b + ""); 457 } 458 } 459 } 460 461 @Override 462 public FriendsOfFriendsProcessorRequest getRequestData() throws IOException { 463 FriendsOfFriendsProcessorRequest.Builder builder = 464 FriendsOfFriendsProcessorRequest.newBuilder(); 465 builder.setPerson(ByteStringer.wrap(person)); 466 builder.setRow(ByteStringer.wrap(row)); 467 builder.addAllResult(result); 468 FriendsOfFriendsProcessorRequest f = builder.build(); 469 return f; 470 } 471 472 @Override 473 public void initialize(FriendsOfFriendsProcessorRequest request) 474 throws IOException { 475 this.person = request.getPerson().toByteArray(); 476 this.row = request.getRow().toByteArray(); 477 result.clear(); 478 result.addAll(request.getResultList()); 479 } 480 } 481 482 public static class RowSwapProcessor extends 483 BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> { 484 byte[] row1 = new byte[0]; 485 byte[] row2 = new byte[0]; 486 487 /** 488 * Empty constructor for Writable 489 */ 490 RowSwapProcessor() { 491 } 492 493 RowSwapProcessor(byte[] row1, byte[] row2) { 494 this.row1 = row1; 495 this.row2 = row2; 496 } 497 498 @Override 499 public Collection<byte[]> getRowsToLock() { 500 List<byte[]> rows = new ArrayList<>(2); 501 rows.add(row1); 502 rows.add(row2); 503 return rows; 504 } 505 506 @Override 507 public boolean readOnly() { 508 return false; 509 } 510 511 @Override 512 public RowSwapProcessorResponse getResult() { 513 return RowSwapProcessorResponse.getDefaultInstance(); 514 } 515 516 @Override 517 public void process(long now, HRegion region, 518 List<Mutation> mutations, WALEdit walEdit) throws IOException { 519 520 // Override the time to avoid race-condition in the unit test caused by 521 // inacurate timer on some machines 522 now = myTimer.getAndIncrement(); 523 524 // Scan both rows 525 List<Cell> kvs1 = new ArrayList<>(); 526 List<Cell> kvs2 = new ArrayList<>(); 527 doScan(region, new Scan(row1, row1), kvs1); 528 doScan(region, new Scan(row2, row2), kvs2); 529 530 // Assert swapped 531 if (swapped) { 532 assertEquals(rowSize, kvs2.size()); 533 assertEquals(row2Size, kvs1.size()); 534 } else { 535 assertEquals(rowSize, kvs1.size()); 536 assertEquals(row2Size, kvs2.size()); 537 } 538 swapped = !swapped; 539 540 // Add and delete keyvalues 541 List<List<Cell>> kvs = new ArrayList<>(2); 542 kvs.add(kvs1); 543 kvs.add(kvs2); 544 byte[][] rows = new byte[][]{row1, row2}; 545 for (int i = 0; i < kvs.size(); ++i) { 546 for (Cell kv : kvs.get(i)) { 547 // Delete from the current row and add to the other row 548 Delete d = new Delete(rows[i]); 549 KeyValue kvDelete = 550 new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), 551 kv.getTimestamp(), KeyValue.Type.Delete); 552 d.add(kvDelete); 553 Put p = new Put(rows[1 - i]); 554 KeyValue kvAdd = 555 new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), 556 now, CellUtil.cloneValue(kv)); 557 p.add(kvAdd); 558 mutations.add(d); 559 walEdit.add(kvDelete); 560 mutations.add(p); 561 walEdit.add(kvAdd); 562 } 563 } 564 } 565 566 @Override 567 public String getName() { 568 return "swap"; 569 } 570 571 @Override 572 public RowSwapProcessorRequest getRequestData() throws IOException { 573 RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder(); 574 builder.setRow1(ByteStringer.wrap(row1)); 575 builder.setRow2(ByteStringer.wrap(row2)); 576 return builder.build(); 577 } 578 579 @Override 580 public void initialize(RowSwapProcessorRequest msg) { 581 this.row1 = msg.getRow1().toByteArray(); 582 this.row2 = msg.getRow2().toByteArray(); 583 } 584 } 585 586 public static class TimeoutProcessor extends 587 BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> { 588 byte[] row = new byte[0]; 589 590 /** 591 * Empty constructor for Writable 592 */ 593 public TimeoutProcessor() { 594 } 595 596 public TimeoutProcessor(byte[] row) { 597 this.row = row; 598 } 599 600 public Collection<byte[]> getRowsToLock() { 601 return Collections.singleton(row); 602 } 603 604 @Override 605 public TimeoutProcessorResponse getResult() { 606 return TimeoutProcessorResponse.getDefaultInstance(); 607 } 608 609 @Override 610 public void process(long now, HRegion region, 611 List<Mutation> mutations, WALEdit walEdit) throws IOException { 612 try { 613 // Sleep for a long time so it timeout 614 Thread.sleep(100 * 1000L); 615 } catch (Exception e) { 616 throw new IOException(e); 617 } 618 } 619 620 @Override 621 public boolean readOnly() { 622 return true; 623 } 624 625 @Override 626 public String getName() { 627 return "timeout"; 628 } 629 630 @Override 631 public TimeoutProcessorRequest getRequestData() throws IOException { 632 TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder(); 633 builder.setRow(ByteStringer.wrap(row)); 634 return builder.build(); 635 } 636 637 @Override 638 public void initialize(TimeoutProcessorRequest msg) throws IOException { 639 this.row = msg.getRow().toByteArray(); 640 } 641 } 642 643 public static void doScan(HRegion region, Scan scan, List<Cell> result) throws IOException { 644 InternalScanner scanner = null; 645 try { 646 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 647 scanner = region.getScanner(scan); 648 result.clear(); 649 scanner.next(result); 650 } finally { 651 if (scanner != null) { 652 scanner.close(); 653 } 654 } 655 } 656 } 657 658 static String stringifyKvs(Collection<Cell> kvs) { 659 StringBuilder out = new StringBuilder(); 660 out.append("["); 661 if (kvs != null) { 662 for (Cell kv : kvs) { 663 byte[] col = CellUtil.cloneQualifier(kv); 664 byte[] val = CellUtil.cloneValue(kv); 665 if (Bytes.equals(col, COUNTER)) { 666 out.append(Bytes.toStringBinary(col) + ":" + 667 Bytes.toInt(val) + " "); 668 } else { 669 out.append(Bytes.toStringBinary(col) + ":" + 670 Bytes.toStringBinary(val) + " "); 671 } 672 } 673 } 674 out.append("]"); 675 return out.toString(); 676 } 677}