001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue; 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.concurrent.TimeUnit; 033import java.util.function.Consumer; 034import java.util.stream.IntStream; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CompareOperator; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HColumnDescriptor; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionInfo; 043import org.apache.hadoop.hbase.HRegionLocation; 044import org.apache.hadoop.hbase.HTestConst; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.MiniHBaseCluster; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.TableNotFoundException; 049import org.apache.hadoop.hbase.filter.BinaryComparator; 050import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 051import org.apache.hadoop.hbase.filter.ColumnRangeFilter; 052import org.apache.hadoop.hbase.filter.QualifierFilter; 053import org.apache.hadoop.hbase.regionserver.HRegionServer; 054import org.apache.hadoop.hbase.testclassification.ClientTests; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.junit.After; 059import org.junit.AfterClass; 060import org.junit.Before; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Rule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.junit.rules.TestName; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070/** 071 * A client-side test, mostly testing scanners with various parameters. 072 */ 073@Category({MediumTests.class, ClientTests.class}) 074public class TestScannersFromClientSide { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestScannersFromClientSide.class); 079 080 private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class); 081 082 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 083 private static byte [] ROW = Bytes.toBytes("testRow"); 084 private static byte [] FAMILY = Bytes.toBytes("testFamily"); 085 private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); 086 private static byte [] VALUE = Bytes.toBytes("testValue"); 087 088 @Rule 089 public TestName name = new TestName(); 090 091 /** 092 * @throws java.lang.Exception 093 */ 094 @BeforeClass 095 public static void setUpBeforeClass() throws Exception { 096 Configuration conf = TEST_UTIL.getConfiguration(); 097 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); 098 TEST_UTIL.startMiniCluster(3); 099 } 100 101 /** 102 * @throws java.lang.Exception 103 */ 104 @AfterClass 105 public static void tearDownAfterClass() throws Exception { 106 TEST_UTIL.shutdownMiniCluster(); 107 } 108 109 /** 110 * @throws java.lang.Exception 111 */ 112 @Before 113 public void setUp() throws Exception { 114 // Nothing to do. 115 } 116 117 /** 118 * @throws java.lang.Exception 119 */ 120 @After 121 public void tearDown() throws Exception { 122 // Nothing to do. 123 } 124 125 /** 126 * Test from client side for batch of scan 127 * 128 * @throws Exception 129 */ 130 @Test 131 public void testScanBatch() throws Exception { 132 final TableName tableName = TableName.valueOf(name.getMethodName()); 133 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); 134 135 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 136 137 Put put; 138 Scan scan; 139 Delete delete; 140 Result result; 141 ResultScanner scanner; 142 boolean toLog = true; 143 List<Cell> kvListExp; 144 145 // table: row, family, c0:0, c1:1, ... , c7:7 146 put = new Put(ROW); 147 for (int i=0; i < QUALIFIERS.length; i++) { 148 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 149 put.add(kv); 150 } 151 ht.put(put); 152 153 // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7 154 put = new Put(ROW); 155 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE); 156 put.add(kv); 157 ht.put(put); 158 159 // delete upto ts: 3 160 delete = new Delete(ROW); 161 delete.addFamily(FAMILY, 3); 162 ht.delete(delete); 163 164 // without batch 165 scan = new Scan().withStartRow(ROW); 166 scan.setMaxVersions(); 167 scanner = ht.getScanner(scan); 168 169 // c4:4, c5:5, c6:6, c7:7 170 kvListExp = new ArrayList<>(); 171 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 172 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 173 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 174 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 175 result = scanner.next(); 176 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 177 178 // with batch 179 scan = new Scan().withStartRow(ROW); 180 scan.setMaxVersions(); 181 scan.setBatch(2); 182 scanner = ht.getScanner(scan); 183 184 // First batch: c4:4, c5:5 185 kvListExp = new ArrayList<>(); 186 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 187 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 188 result = scanner.next(); 189 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 190 191 // Second batch: c6:6, c7:7 192 kvListExp = new ArrayList<>(); 193 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 194 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 195 result = scanner.next(); 196 verifyResult(result, kvListExp, toLog, "Testing second batch of scan"); 197 198 } 199 200 @Test 201 public void testMaxResultSizeIsSetToDefault() throws Exception { 202 final TableName tableName = TableName.valueOf(name.getMethodName()); 203 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 204 205 // The max result size we expect the scan to use by default. 206 long expectedMaxResultSize = 207 TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 208 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 209 210 int numRows = 5; 211 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 212 213 int numQualifiers = 10; 214 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 215 216 // Specify the cell size such that a single row will be larger than the default 217 // value of maxResultSize. This means that Scan RPCs should return at most a single 218 // result back to the client. 219 int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1)); 220 byte[] cellValue = Bytes.createMaxByteArray(cellSize); 221 222 Put put; 223 List<Put> puts = new ArrayList<>(); 224 for (int row = 0; row < ROWS.length; row++) { 225 put = new Put(ROWS[row]); 226 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 227 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue); 228 put.add(kv); 229 } 230 puts.add(put); 231 } 232 ht.put(puts); 233 234 // Create a scan with the default configuration. 235 Scan scan = new Scan(); 236 237 ResultScanner scanner = ht.getScanner(scan); 238 assertTrue(scanner instanceof ClientScanner); 239 ClientScanner clientScanner = (ClientScanner) scanner; 240 241 // Call next to issue a single RPC to the server 242 scanner.next(); 243 244 // The scanner should have, at most, a single result in its cache. If there more results exists 245 // in the cache it means that more than the expected max result size was fetched. 246 assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results", 247 clientScanner.getCacheSize() <= 1); 248 } 249 250 /** 251 * Scan on not existing table should throw the exception with correct message 252 */ 253 @Test 254 public void testScannerForNotExistingTable() { 255 String[] tableNames = {"A", "Z", "A:A", "Z:Z"}; 256 for(String tableName : tableNames) { 257 try { 258 Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName)); 259 testSmallScan(table, true, 1, 5); 260 fail("TableNotFoundException was not thrown"); 261 } catch (TableNotFoundException e) { 262 // We expect that the message for TableNotFoundException would have only the table name only 263 // Otherwise that would mean that localeRegionInMeta doesn't work properly 264 assertEquals(e.getMessage(), tableName); 265 } catch (Exception e) { 266 fail("Unexpected exception " + e.getMessage()); 267 } 268 } 269 } 270 271 @Test 272 public void testSmallScan() throws Exception { 273 final TableName tableName = TableName.valueOf(name.getMethodName()); 274 275 int numRows = 10; 276 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 277 278 int numQualifiers = 10; 279 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 280 281 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 282 283 Put put; 284 List<Put> puts = new ArrayList<>(); 285 for (int row = 0; row < ROWS.length; row++) { 286 put = new Put(ROWS[row]); 287 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 288 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE); 289 put.add(kv); 290 } 291 puts.add(put); 292 } 293 ht.put(puts); 294 295 int expectedRows = numRows; 296 int expectedCols = numRows * numQualifiers; 297 298 // Test normal and reversed 299 testSmallScan(ht, true, expectedRows, expectedCols); 300 testSmallScan(ht, false, expectedRows, expectedCols); 301 } 302 303 /** 304 * Run through a variety of test configurations with a small scan 305 * @param table 306 * @param reversed 307 * @param rows 308 * @param columns 309 * @throws Exception 310 */ 311 private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception { 312 Scan baseScan = new Scan(); 313 baseScan.setReversed(reversed); 314 baseScan.setSmall(true); 315 316 Scan scan = new Scan(baseScan); 317 verifyExpectedCounts(table, scan, rows, columns); 318 319 scan = new Scan(baseScan); 320 scan.setMaxResultSize(1); 321 verifyExpectedCounts(table, scan, rows, columns); 322 323 scan = new Scan(baseScan); 324 scan.setMaxResultSize(1); 325 scan.setCaching(Integer.MAX_VALUE); 326 verifyExpectedCounts(table, scan, rows, columns); 327 } 328 329 private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount, 330 int expectedCellCount) throws Exception { 331 ResultScanner scanner = table.getScanner(scan); 332 333 int rowCount = 0; 334 int cellCount = 0; 335 Result r = null; 336 while ((r = scanner.next()) != null) { 337 rowCount++; 338 cellCount += r.rawCells().length; 339 } 340 341 assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, 342 expectedRowCount == rowCount); 343 assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount, 344 expectedCellCount == cellCount); 345 scanner.close(); 346 } 347 348 /** 349 * Test from client side for get with maxResultPerCF set 350 * 351 * @throws Exception 352 */ 353 @Test 354 public void testGetMaxResults() throws Exception { 355 final TableName tableName = TableName.valueOf(name.getMethodName()); 356 byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 357 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 358 359 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 360 361 Get get; 362 Put put; 363 Result result; 364 boolean toLog = true; 365 List<Cell> kvListExp; 366 367 kvListExp = new ArrayList<>(); 368 // Insert one CF for row[0] 369 put = new Put(ROW); 370 for (int i=0; i < 10; i++) { 371 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 372 put.add(kv); 373 kvListExp.add(kv); 374 } 375 ht.put(put); 376 377 get = new Get(ROW); 378 result = ht.get(get); 379 verifyResult(result, kvListExp, toLog, "Testing without setting maxResults"); 380 381 get = new Get(ROW); 382 get.setMaxResultsPerColumnFamily(2); 383 result = ht.get(get); 384 kvListExp = new ArrayList<>(); 385 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE)); 386 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 387 verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults"); 388 389 // Filters: ColumnRangeFilter 390 get = new Get(ROW); 391 get.setMaxResultsPerColumnFamily(5); 392 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], 393 true)); 394 result = ht.get(get); 395 kvListExp = new ArrayList<>(); 396 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE)); 397 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 398 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 399 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 400 verifyResult(result, kvListExp, toLog, "Testing single CF with CRF"); 401 402 // Insert two more CF for row[0] 403 // 20 columns for CF2, 10 columns for CF1 404 put = new Put(ROW); 405 for (int i=0; i < QUALIFIERS.length; i++) { 406 KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE); 407 put.add(kv); 408 } 409 ht.put(put); 410 411 put = new Put(ROW); 412 for (int i=0; i < 10; i++) { 413 KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE); 414 put.add(kv); 415 } 416 ht.put(put); 417 418 get = new Get(ROW); 419 get.setMaxResultsPerColumnFamily(12); 420 get.addFamily(FAMILIES[1]); 421 get.addFamily(FAMILIES[2]); 422 result = ht.get(get); 423 kvListExp = new ArrayList<>(); 424 //Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19 425 for (int i=0; i < 10; i++) { 426 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 427 } 428 for (int i=0; i < 2; i++) { 429 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 430 } 431 for (int i=10; i < 20; i++) { 432 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 433 } 434 verifyResult(result, kvListExp, toLog, "Testing multiple CFs"); 435 436 // Filters: ColumnRangeFilter and ColumnPrefixFilter 437 get = new Get(ROW); 438 get.setMaxResultsPerColumnFamily(3); 439 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true)); 440 result = ht.get(get); 441 kvListExp = new ArrayList<>(); 442 for (int i=2; i < 5; i++) { 443 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 444 } 445 for (int i=2; i < 5; i++) { 446 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 447 } 448 for (int i=2; i < 5; i++) { 449 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 450 } 451 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF"); 452 453 get = new Get(ROW); 454 get.setMaxResultsPerColumnFamily(7); 455 get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1])); 456 result = ht.get(get); 457 kvListExp = new ArrayList<>(); 458 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 459 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE)); 460 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE)); 461 for (int i=10; i < 16; i++) { 462 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 463 } 464 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF"); 465 466 } 467 468 /** 469 * Test from client side for scan with maxResultPerCF set 470 * 471 * @throws Exception 472 */ 473 @Test 474 public void testScanMaxResults() throws Exception { 475 final TableName tableName = TableName.valueOf(name.getMethodName()); 476 byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); 477 byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 478 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); 479 480 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 481 482 Put put; 483 Scan scan; 484 Result result; 485 boolean toLog = true; 486 List<Cell> kvListExp, kvListScan; 487 488 kvListExp = new ArrayList<>(); 489 490 for (int r=0; r < ROWS.length; r++) { 491 put = new Put(ROWS[r]); 492 for (int c=0; c < FAMILIES.length; c++) { 493 for (int q=0; q < QUALIFIERS.length; q++) { 494 KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); 495 put.add(kv); 496 if (q < 4) { 497 kvListExp.add(kv); 498 } 499 } 500 } 501 ht.put(put); 502 } 503 504 scan = new Scan(); 505 scan.setMaxResultsPerColumnFamily(4); 506 ResultScanner scanner = ht.getScanner(scan); 507 kvListScan = new ArrayList<>(); 508 while ((result = scanner.next()) != null) { 509 for (Cell kv : result.listCells()) { 510 kvListScan.add(kv); 511 } 512 } 513 result = Result.create(kvListScan); 514 verifyResult(result, kvListExp, toLog, "Testing scan with maxResults"); 515 516 } 517 518 /** 519 * Test from client side for get with rowOffset 520 * 521 * @throws Exception 522 */ 523 @Test 524 public void testGetRowOffset() throws Exception { 525 final TableName tableName = TableName.valueOf(name.getMethodName()); 526 byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 527 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 528 529 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 530 531 Get get; 532 Put put; 533 Result result; 534 boolean toLog = true; 535 List<Cell> kvListExp; 536 537 // Insert one CF for row 538 kvListExp = new ArrayList<>(); 539 put = new Put(ROW); 540 for (int i=0; i < 10; i++) { 541 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 542 put.add(kv); 543 // skipping first two kvs 544 if (i < 2) continue; 545 kvListExp.add(kv); 546 } 547 ht.put(put); 548 549 //setting offset to 2 550 get = new Get(ROW); 551 get.setRowOffsetPerColumnFamily(2); 552 result = ht.get(get); 553 verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset"); 554 555 //setting offset to 20 556 get = new Get(ROW); 557 get.setRowOffsetPerColumnFamily(20); 558 result = ht.get(get); 559 kvListExp = new ArrayList<>(); 560 verifyResult(result, kvListExp, toLog, "Testing offset > #kvs"); 561 562 //offset + maxResultPerCF 563 get = new Get(ROW); 564 get.setRowOffsetPerColumnFamily(4); 565 get.setMaxResultsPerColumnFamily(5); 566 result = ht.get(get); 567 kvListExp = new ArrayList<>(); 568 for (int i=4; i < 9; i++) { 569 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 570 } 571 verifyResult(result, kvListExp, toLog, 572 "Testing offset + setMaxResultsPerCF"); 573 574 // Filters: ColumnRangeFilter 575 get = new Get(ROW); 576 get.setRowOffsetPerColumnFamily(1); 577 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], 578 true)); 579 result = ht.get(get); 580 kvListExp = new ArrayList<>(); 581 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 582 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 583 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 584 verifyResult(result, kvListExp, toLog, "Testing offset with CRF"); 585 586 // Insert into two more CFs for row 587 // 10 columns for CF2, 10 columns for CF1 588 for(int j=2; j > 0; j--) { 589 put = new Put(ROW); 590 for (int i=0; i < 10; i++) { 591 KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE); 592 put.add(kv); 593 } 594 ht.put(put); 595 } 596 597 get = new Get(ROW); 598 get.setRowOffsetPerColumnFamily(4); 599 get.setMaxResultsPerColumnFamily(2); 600 get.addFamily(FAMILIES[1]); 601 get.addFamily(FAMILIES[2]); 602 result = ht.get(get); 603 kvListExp = new ArrayList<>(); 604 //Exp: CF1:q4, q5, CF2: q4, q5 605 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE)); 606 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE)); 607 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE)); 608 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE)); 609 verifyResult(result, kvListExp, toLog, 610 "Testing offset + multiple CFs + maxResults"); 611 } 612 613 /** 614 * Test from client side for scan while the region is reopened 615 * on the same region server. 616 * 617 * @throws Exception 618 */ 619 @Test 620 public void testScanOnReopenedRegion() throws Exception { 621 final TableName tableName = TableName.valueOf(name.getMethodName()); 622 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2); 623 624 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 625 626 Put put; 627 Scan scan; 628 Result result; 629 ResultScanner scanner; 630 boolean toLog = false; 631 List<Cell> kvListExp; 632 633 // table: row, family, c0:0, c1:1 634 put = new Put(ROW); 635 for (int i=0; i < QUALIFIERS.length; i++) { 636 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 637 put.add(kv); 638 } 639 ht.put(put); 640 641 scan = new Scan().withStartRow(ROW); 642 scanner = ht.getScanner(scan); 643 644 HRegionLocation loc; 645 646 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 647 loc = locator.getRegionLocation(ROW); 648 } 649 HRegionInfo hri = loc.getRegionInfo(); 650 MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 651 byte[] regionName = hri.getRegionName(); 652 int i = cluster.getServerWith(regionName); 653 HRegionServer rs = cluster.getRegionServer(i); 654 LOG.info("Unassigning " + hri); 655 TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true); 656 long startTime = EnvironmentEdgeManager.currentTime(); 657 long timeOut = 10000; 658 boolean offline = false; 659 while (true) { 660 if (rs.getOnlineRegion(regionName) == null) { 661 offline = true; 662 break; 663 } 664 assertTrue("Timed out in closing the testing region", 665 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 666 } 667 assertTrue(offline); 668 LOG.info("Assigning " + hri); 669 TEST_UTIL.getAdmin().assign(hri.getRegionName()); 670 startTime = EnvironmentEdgeManager.currentTime(); 671 while (true) { 672 rs = cluster.getRegionServer(cluster.getServerWith(regionName)); 673 if (rs != null && rs.getOnlineRegion(regionName) != null) { 674 offline = false; 675 break; 676 } 677 assertTrue("Timed out in open the testing region", 678 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 679 } 680 assertFalse(offline); 681 682 // c0:0, c1:1 683 kvListExp = new ArrayList<>(); 684 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE)); 685 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE)); 686 result = scanner.next(); 687 verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); 688 } 689 690 @Test 691 public void testAsyncScannerWithSmallData() throws Exception { 692 testAsyncScanner(TableName.valueOf(name.getMethodName()), 693 2, 694 3, 695 10, 696 -1, 697 null); 698 } 699 700 @Test 701 public void testAsyncScannerWithManyRows() throws Exception { 702 testAsyncScanner(TableName.valueOf(name.getMethodName()), 703 30000, 704 1, 705 1, 706 -1, 707 null); 708 } 709 710 @Test 711 public void testAsyncScannerWithoutCaching() throws Exception { 712 testAsyncScanner(TableName.valueOf(name.getMethodName()), 713 5, 714 1, 715 1, 716 1, 717 (b) -> { 718 try { 719 TimeUnit.MILLISECONDS.sleep(500); 720 } catch (InterruptedException ex) { 721 } 722 }); 723 } 724 725 private void testAsyncScanner(TableName table, int rowNumber, int familyNumber, 726 int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception { 727 assert rowNumber > 0; 728 assert familyNumber > 0; 729 assert qualifierNumber > 0; 730 byte[] row = Bytes.toBytes("r"); 731 byte[] family = Bytes.toBytes("f"); 732 byte[] qualifier = Bytes.toBytes("q"); 733 byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber); 734 byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber); 735 byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber); 736 737 Table ht = TEST_UTIL.createTable(table, families); 738 739 boolean toLog = true; 740 List<Cell> kvListExp = new ArrayList<>(); 741 742 List<Put> puts = new ArrayList<>(); 743 for (byte[] r : rows) { 744 Put put = new Put(r); 745 for (byte[] f : families) { 746 for (byte[] q : qualifiers) { 747 KeyValue kv = new KeyValue(r, f, q, 1, VALUE); 748 put.add(kv); 749 kvListExp.add(kv); 750 } 751 } 752 puts.add(put); 753 if (puts.size() > 1000) { 754 ht.put(puts); 755 puts.clear(); 756 } 757 } 758 if (!puts.isEmpty()) { 759 ht.put(puts); 760 puts.clear(); 761 } 762 763 Scan scan = new Scan(); 764 scan.setAsyncPrefetch(true); 765 if (caching > 0) { 766 scan.setCaching(caching); 767 } 768 try (ResultScanner scanner = ht.getScanner(scan)) { 769 assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); 770 ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener); 771 List<Cell> kvListScan = new ArrayList<>(); 772 Result result; 773 boolean first = true; 774 int actualRows = 0; 775 while ((result = scanner.next()) != null) { 776 ++actualRows; 777 // waiting for cache. see HBASE-17376 778 if (first) { 779 TimeUnit.SECONDS.sleep(1); 780 first = false; 781 } 782 for (Cell kv : result.listCells()) { 783 kvListScan.add(kv); 784 } 785 } 786 assertEquals(rowNumber, actualRows); 787 // These cells may have different rows but it is ok. The Result#getRow 788 // isn't used in the verifyResult() 789 result = Result.create(kvListScan); 790 verifyResult(result, kvListExp, toLog, "Testing async scan"); 791 } 792 793 TEST_UTIL.deleteTable(table); 794 } 795 796 private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) { 797 int maxLength = Integer.toString(n).length(); 798 byte [][] ret = new byte[n][]; 799 for (int i = 0; i < n; i++) { 800 int length = Integer.toString(i).length(); 801 StringBuilder buf = new StringBuilder(Integer.toString(i)); 802 IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0")); 803 byte[] tail = Bytes.toBytes(buf.toString()); 804 ret[i] = Bytes.add(base, tail); 805 } 806 return ret; 807 } 808 809 static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, 810 String msg) { 811 812 LOG.info(msg); 813 LOG.info("Expected count: " + expKvList.size()); 814 LOG.info("Actual count: " + result.size()); 815 if (expKvList.isEmpty()) 816 return; 817 818 int i = 0; 819 for (Cell kv : result.rawCells()) { 820 if (i >= expKvList.size()) { 821 break; // we will check the size later 822 } 823 824 Cell kvExp = expKvList.get(i++); 825 if (toLog) { 826 LOG.info("get kv is: " + kv.toString()); 827 LOG.info("exp kv is: " + kvExp.toString()); 828 } 829 assertTrue("Not equal", kvExp.equals(kv)); 830 } 831 832 assertEquals(expKvList.size(), result.size()); 833 } 834 835 @Test 836 public void testReadExpiredDataForRawScan() throws IOException { 837 TableName tableName = TableName.valueOf(name.getMethodName()); 838 long ts = System.currentTimeMillis() - 10000; 839 byte[] value = Bytes.toBytes("expired"); 840 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 841 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value)); 842 assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER)); 843 TEST_UTIL.getAdmin().modifyColumnFamily(tableName, 844 new HColumnDescriptor(FAMILY).setTimeToLive(5)); 845 try (ResultScanner scanner = table.getScanner(FAMILY)) { 846 assertNull(scanner.next()); 847 } 848 try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) { 849 assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER)); 850 assertNull(scanner.next()); 851 } 852 } 853 } 854 855 @Test 856 public void testScanWithColumnsAndFilterAndVersion() throws IOException { 857 TableName tableName = TableName.valueOf(name.getMethodName()); 858 try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) { 859 for (int i = 0; i < 4; i++) { 860 Put put = new Put(ROW); 861 put.addColumn(FAMILY, QUALIFIER, VALUE); 862 table.put(put); 863 } 864 865 Scan scan = new Scan(); 866 scan.addColumn(FAMILY, QUALIFIER); 867 scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER))); 868 scan.readVersions(3); 869 870 try (ResultScanner scanner = table.getScanner(scan)) { 871 Result result = scanner.next(); 872 assertEquals(3, result.size()); 873 } 874 } 875 } 876 877 @Test 878 public void testScanWithSameStartRowStopRow() throws IOException { 879 TableName tableName = TableName.valueOf(name.getMethodName()); 880 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 881 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 882 883 Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW); 884 try (ResultScanner scanner = table.getScanner(scan)) { 885 assertNull(scanner.next()); 886 } 887 888 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true); 889 try (ResultScanner scanner = table.getScanner(scan)) { 890 Result result = scanner.next(); 891 assertNotNull(result); 892 assertArrayEquals(ROW, result.getRow()); 893 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 894 assertNull(scanner.next()); 895 } 896 897 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false); 898 try (ResultScanner scanner = table.getScanner(scan)) { 899 assertNull(scanner.next()); 900 } 901 902 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false); 903 try (ResultScanner scanner = table.getScanner(scan)) { 904 assertNull(scanner.next()); 905 } 906 907 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true); 908 try (ResultScanner scanner = table.getScanner(scan)) { 909 assertNull(scanner.next()); 910 } 911 } 912 } 913 914 @Test 915 public void testReverseScanWithFlush() throws Exception { 916 TableName tableName = TableName.valueOf(name.getMethodName()); 917 final int BATCH_SIZE = 10; 918 final int ROWS_TO_INSERT = 100; 919 final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); 920 921 try (Table table = TEST_UTIL.createTable(tableName, FAMILY); 922 Admin admin = TEST_UTIL.getAdmin()) { 923 List<Put> putList = new ArrayList<>(); 924 for (long i = 0; i < ROWS_TO_INSERT; i++) { 925 Put put = new Put(Bytes.toBytes(i)); 926 put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE); 927 putList.add(put); 928 929 if (putList.size() >= BATCH_SIZE) { 930 table.put(putList); 931 admin.flush(tableName); 932 putList.clear(); 933 } 934 } 935 936 if (!putList.isEmpty()) { 937 table.put(putList); 938 admin.flush(tableName); 939 putList.clear(); 940 } 941 942 Scan scan = new Scan(); 943 scan.setReversed(true); 944 int count = 0; 945 946 try (ResultScanner results = table.getScanner(scan)) { 947 for (Result result : results) { 948 count++; 949 } 950 } 951 assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count, 952 ROWS_TO_INSERT, count); 953 } 954 } 955}