001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY; 021import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue; 022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS; 023import static org.junit.Assert.assertArrayEquals; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertNull; 028import static org.junit.Assert.assertTrue; 029import static org.junit.Assert.fail; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collection; 035import java.util.List; 036import java.util.concurrent.TimeUnit; 037import java.util.function.Consumer; 038import java.util.stream.IntStream; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CompareOperator; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtility; 044import org.apache.hadoop.hbase.HColumnDescriptor; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HRegionInfo; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.HTestConst; 049import org.apache.hadoop.hbase.KeyValue; 050import org.apache.hadoop.hbase.MiniHBaseCluster; 051import org.apache.hadoop.hbase.StartMiniClusterOption; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.TableNameTestRule; 054import org.apache.hadoop.hbase.TableNotFoundException; 055import org.apache.hadoop.hbase.exceptions.DeserializationException; 056import org.apache.hadoop.hbase.filter.BinaryComparator; 057import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 058import org.apache.hadoop.hbase.filter.ColumnRangeFilter; 059import org.apache.hadoop.hbase.filter.FilterBase; 060import org.apache.hadoop.hbase.filter.QualifierFilter; 061import org.apache.hadoop.hbase.regionserver.HRegionServer; 062import org.apache.hadoop.hbase.testclassification.ClientTests; 063import org.apache.hadoop.hbase.testclassification.MediumTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.junit.AfterClass; 067import org.junit.Before; 068import org.junit.ClassRule; 069import org.junit.Rule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.junit.runner.RunWith; 073import org.junit.runners.Parameterized; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 078 079/** 080 * A client-side test, mostly testing scanners with various parameters. Parameterized on different 081 * registry implementations. 082 */ 083@Category({MediumTests.class, ClientTests.class}) 084@RunWith(Parameterized.class) 085public class TestScannersFromClientSide { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestScannersFromClientSide.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class); 092 093 private static HBaseTestingUtility TEST_UTIL; 094 private static byte [] ROW = Bytes.toBytes("testRow"); 095 private static byte [] FAMILY = Bytes.toBytes("testFamily"); 096 private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); 097 private static byte [] VALUE = Bytes.toBytes("testValue"); 098 099 @Rule public TableNameTestRule name = new TableNameTestRule(); 100 101 /** 102 * @throws java.lang.Exception 103 */ 104 @AfterClass 105 public static void tearDownAfterClass() throws Exception { 106 if (TEST_UTIL != null) { 107 TEST_UTIL.shutdownMiniCluster(); 108 } 109 } 110 111 /** 112 * @throws java.lang.Exception 113 */ 114 @Before 115 public void setUp() throws Exception { 116 // Nothing to do. 117 } 118 119 @Parameterized.Parameters 120 public static Collection<Object[]> parameters() { 121 return Arrays.asList(new Object[][] { 122 { MasterRegistry.class, 1}, 123 { MasterRegistry.class, 2}, 124 { ZKConnectionRegistry.class, 1} 125 }); 126 } 127 128 /** 129 * JUnit does not provide an easy way to run a hook after each parameterized run. Without that 130 * there is no easy way to restart the test cluster after each parameterized run. Annotation 131 * BeforeParam does not work either because it runs before parameterization and hence does not 132 * have access to the test parameters (which is weird). 133 * 134 * This *hack* checks if the current instance of test cluster configuration has the passed 135 * parameterized configs. In such a case, we can just reuse the cluster for test and do not need 136 * to initialize from scratch. While this is a hack, it saves a ton of time for the full 137 * test and de-flakes it. 138 */ 139 private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) { 140 // initialize() is called for every unit test, however we only want to reset the cluster state 141 // at the end of every parameterized run. 142 if (TEST_UTIL == null) { 143 return false; 144 } 145 Configuration conf = TEST_UTIL.getConfiguration(); 146 Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 147 ZKConnectionRegistry.class); 148 int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 149 MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT); 150 return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig; 151 } 152 153 public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception { 154 if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) { 155 return; 156 } 157 if (TEST_UTIL != null) { 158 // We reached the end of a parameterized run, clean up the cluster. 159 TEST_UTIL.shutdownMiniCluster(); 160 } 161 TEST_UTIL = new HBaseTestingUtility(); 162 Configuration conf = TEST_UTIL.getConfiguration(); 163 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); 164 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl, 165 ConnectionRegistry.class); 166 Preconditions.checkArgument(numHedgedReqs > 0); 167 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); 168 StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder(); 169 // Multiple masters needed only when hedged reads for master registry are enabled. 170 builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3); 171 TEST_UTIL.startMiniCluster(builder.build()); 172 } 173 174 /** 175 * Test from client side for batch of scan 176 * 177 * @throws Exception 178 */ 179 @Test 180 public void testScanBatch() throws Exception { 181 final TableName tableName = name.getTableName(); 182 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); 183 184 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 185 186 Put put; 187 Scan scan; 188 Delete delete; 189 Result result; 190 ResultScanner scanner; 191 boolean toLog = true; 192 List<Cell> kvListExp; 193 194 // table: row, family, c0:0, c1:1, ... , c7:7 195 put = new Put(ROW); 196 for (int i=0; i < QUALIFIERS.length; i++) { 197 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 198 put.add(kv); 199 } 200 ht.put(put); 201 202 // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7 203 put = new Put(ROW); 204 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE); 205 put.add(kv); 206 ht.put(put); 207 208 // delete upto ts: 3 209 delete = new Delete(ROW); 210 delete.addFamily(FAMILY, 3); 211 ht.delete(delete); 212 213 // without batch 214 scan = new Scan().withStartRow(ROW); 215 scan.setMaxVersions(); 216 scanner = ht.getScanner(scan); 217 218 // c4:4, c5:5, c6:6, c7:7 219 kvListExp = new ArrayList<>(); 220 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 221 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 222 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 223 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 224 result = scanner.next(); 225 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 226 227 // with batch 228 scan = new Scan().withStartRow(ROW); 229 scan.setMaxVersions(); 230 scan.setBatch(2); 231 scanner = ht.getScanner(scan); 232 233 // First batch: c4:4, c5:5 234 kvListExp = new ArrayList<>(); 235 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 236 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 237 result = scanner.next(); 238 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 239 240 // Second batch: c6:6, c7:7 241 kvListExp = new ArrayList<>(); 242 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 243 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 244 result = scanner.next(); 245 verifyResult(result, kvListExp, toLog, "Testing second batch of scan"); 246 247 } 248 249 @Test 250 public void testMaxResultSizeIsSetToDefault() throws Exception { 251 final TableName tableName = name.getTableName(); 252 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 253 254 // The max result size we expect the scan to use by default. 255 long expectedMaxResultSize = 256 TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 257 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 258 259 int numRows = 5; 260 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 261 262 int numQualifiers = 10; 263 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 264 265 // Specify the cell size such that a single row will be larger than the default 266 // value of maxResultSize. This means that Scan RPCs should return at most a single 267 // result back to the client. 268 int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1)); 269 byte[] cellValue = Bytes.createMaxByteArray(cellSize); 270 271 Put put; 272 List<Put> puts = new ArrayList<>(); 273 for (int row = 0; row < ROWS.length; row++) { 274 put = new Put(ROWS[row]); 275 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 276 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue); 277 put.add(kv); 278 } 279 puts.add(put); 280 } 281 ht.put(puts); 282 283 // Create a scan with the default configuration. 284 Scan scan = new Scan(); 285 286 ResultScanner scanner = ht.getScanner(scan); 287 assertTrue(scanner instanceof ClientScanner); 288 ClientScanner clientScanner = (ClientScanner) scanner; 289 290 // Call next to issue a single RPC to the server 291 scanner.next(); 292 293 // The scanner should have, at most, a single result in its cache. If there more results exists 294 // in the cache it means that more than the expected max result size was fetched. 295 assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results", 296 clientScanner.getCacheSize() <= 1); 297 } 298 299 /** 300 * Scan on not existing table should throw the exception with correct message 301 */ 302 @Test 303 public void testScannerForNotExistingTable() { 304 String[] tableNames = {"A", "Z", "A:A", "Z:Z"}; 305 for(String tableName : tableNames) { 306 try { 307 Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName)); 308 testSmallScan(table, true, 1, 5); 309 fail("TableNotFoundException was not thrown"); 310 } catch (TableNotFoundException e) { 311 // We expect that the message for TableNotFoundException would have only the table name only 312 // Otherwise that would mean that localeRegionInMeta doesn't work properly 313 assertEquals(e.getMessage(), tableName); 314 } catch (Exception e) { 315 fail("Unexpected exception " + e.getMessage()); 316 } 317 } 318 } 319 320 @Test 321 public void testSmallScan() throws Exception { 322 final TableName tableName = name.getTableName(); 323 324 int numRows = 10; 325 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 326 327 int numQualifiers = 10; 328 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 329 330 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 331 332 Put put; 333 List<Put> puts = new ArrayList<>(); 334 for (int row = 0; row < ROWS.length; row++) { 335 put = new Put(ROWS[row]); 336 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 337 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE); 338 put.add(kv); 339 } 340 puts.add(put); 341 } 342 ht.put(puts); 343 344 int expectedRows = numRows; 345 int expectedCols = numRows * numQualifiers; 346 347 // Test normal and reversed 348 testSmallScan(ht, true, expectedRows, expectedCols); 349 testSmallScan(ht, false, expectedRows, expectedCols); 350 } 351 352 /** 353 * Run through a variety of test configurations with a small scan 354 * @param table 355 * @param reversed 356 * @param rows 357 * @param columns 358 * @throws Exception 359 */ 360 private void testSmallScan( 361 Table table, boolean reversed, int rows, int columns) throws Exception { 362 Scan baseScan = new Scan(); 363 baseScan.setReversed(reversed); 364 baseScan.setSmall(true); 365 366 Scan scan = new Scan(baseScan); 367 verifyExpectedCounts(table, scan, rows, columns); 368 369 scan = new Scan(baseScan); 370 scan.setMaxResultSize(1); 371 verifyExpectedCounts(table, scan, rows, columns); 372 373 scan = new Scan(baseScan); 374 scan.setMaxResultSize(1); 375 scan.setCaching(Integer.MAX_VALUE); 376 verifyExpectedCounts(table, scan, rows, columns); 377 } 378 379 private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount, 380 int expectedCellCount) throws Exception { 381 ResultScanner scanner = table.getScanner(scan); 382 383 int rowCount = 0; 384 int cellCount = 0; 385 Result r = null; 386 while ((r = scanner.next()) != null) { 387 rowCount++; 388 cellCount += r.rawCells().length; 389 } 390 391 assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, 392 expectedRowCount == rowCount); 393 assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount, 394 expectedCellCount == cellCount); 395 scanner.close(); 396 } 397 398 /** 399 * Test from client side for get with maxResultPerCF set 400 * 401 * @throws Exception 402 */ 403 @Test 404 public void testGetMaxResults() throws Exception { 405 final TableName tableName = name.getTableName(); 406 byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 407 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 408 409 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 410 411 Get get; 412 Put put; 413 Result result; 414 boolean toLog = true; 415 List<Cell> kvListExp; 416 417 kvListExp = new ArrayList<>(); 418 // Insert one CF for row[0] 419 put = new Put(ROW); 420 for (int i=0; i < 10; i++) { 421 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 422 put.add(kv); 423 kvListExp.add(kv); 424 } 425 ht.put(put); 426 427 get = new Get(ROW); 428 result = ht.get(get); 429 verifyResult(result, kvListExp, toLog, "Testing without setting maxResults"); 430 431 get = new Get(ROW); 432 get.setMaxResultsPerColumnFamily(2); 433 result = ht.get(get); 434 kvListExp = new ArrayList<>(); 435 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE)); 436 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 437 verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults"); 438 439 // Filters: ColumnRangeFilter 440 get = new Get(ROW); 441 get.setMaxResultsPerColumnFamily(5); 442 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], 443 true)); 444 result = ht.get(get); 445 kvListExp = new ArrayList<>(); 446 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE)); 447 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 448 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 449 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 450 verifyResult(result, kvListExp, toLog, "Testing single CF with CRF"); 451 452 // Insert two more CF for row[0] 453 // 20 columns for CF2, 10 columns for CF1 454 put = new Put(ROW); 455 for (int i=0; i < QUALIFIERS.length; i++) { 456 KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE); 457 put.add(kv); 458 } 459 ht.put(put); 460 461 put = new Put(ROW); 462 for (int i=0; i < 10; i++) { 463 KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE); 464 put.add(kv); 465 } 466 ht.put(put); 467 468 get = new Get(ROW); 469 get.setMaxResultsPerColumnFamily(12); 470 get.addFamily(FAMILIES[1]); 471 get.addFamily(FAMILIES[2]); 472 result = ht.get(get); 473 kvListExp = new ArrayList<>(); 474 //Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19 475 for (int i=0; i < 10; i++) { 476 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 477 } 478 for (int i=0; i < 2; i++) { 479 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 480 } 481 for (int i=10; i < 20; i++) { 482 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 483 } 484 verifyResult(result, kvListExp, toLog, "Testing multiple CFs"); 485 486 // Filters: ColumnRangeFilter and ColumnPrefixFilter 487 get = new Get(ROW); 488 get.setMaxResultsPerColumnFamily(3); 489 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true)); 490 result = ht.get(get); 491 kvListExp = new ArrayList<>(); 492 for (int i=2; i < 5; i++) { 493 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 494 } 495 for (int i=2; i < 5; i++) { 496 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 497 } 498 for (int i=2; i < 5; i++) { 499 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 500 } 501 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF"); 502 503 get = new Get(ROW); 504 get.setMaxResultsPerColumnFamily(7); 505 get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1])); 506 result = ht.get(get); 507 kvListExp = new ArrayList<>(); 508 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 509 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE)); 510 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE)); 511 for (int i=10; i < 16; i++) { 512 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 513 } 514 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF"); 515 516 } 517 518 /** 519 * Test from client side for scan with maxResultPerCF set 520 * 521 * @throws Exception 522 */ 523 @Test 524 public void testScanMaxResults() throws Exception { 525 final TableName tableName = name.getTableName(); 526 byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); 527 byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 528 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); 529 530 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 531 532 Put put; 533 Scan scan; 534 Result result; 535 boolean toLog = true; 536 List<Cell> kvListExp, kvListScan; 537 538 kvListExp = new ArrayList<>(); 539 540 for (int r=0; r < ROWS.length; r++) { 541 put = new Put(ROWS[r]); 542 for (int c=0; c < FAMILIES.length; c++) { 543 for (int q=0; q < QUALIFIERS.length; q++) { 544 KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); 545 put.add(kv); 546 if (q < 4) { 547 kvListExp.add(kv); 548 } 549 } 550 } 551 ht.put(put); 552 } 553 554 scan = new Scan(); 555 scan.setMaxResultsPerColumnFamily(4); 556 ResultScanner scanner = ht.getScanner(scan); 557 kvListScan = new ArrayList<>(); 558 while ((result = scanner.next()) != null) { 559 for (Cell kv : result.listCells()) { 560 kvListScan.add(kv); 561 } 562 } 563 result = Result.create(kvListScan); 564 verifyResult(result, kvListExp, toLog, "Testing scan with maxResults"); 565 566 } 567 568 /** 569 * Test from client side for get with rowOffset 570 * 571 * @throws Exception 572 */ 573 @Test 574 public void testGetRowOffset() throws Exception { 575 final TableName tableName = name.getTableName(); 576 byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 577 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 578 579 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 580 581 Get get; 582 Put put; 583 Result result; 584 boolean toLog = true; 585 List<Cell> kvListExp; 586 587 // Insert one CF for row 588 kvListExp = new ArrayList<>(); 589 put = new Put(ROW); 590 for (int i=0; i < 10; i++) { 591 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 592 put.add(kv); 593 // skipping first two kvs 594 if (i < 2) { 595 continue; 596 } 597 kvListExp.add(kv); 598 } 599 ht.put(put); 600 601 //setting offset to 2 602 get = new Get(ROW); 603 get.setRowOffsetPerColumnFamily(2); 604 result = ht.get(get); 605 verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset"); 606 607 //setting offset to 20 608 get = new Get(ROW); 609 get.setRowOffsetPerColumnFamily(20); 610 result = ht.get(get); 611 kvListExp = new ArrayList<>(); 612 verifyResult(result, kvListExp, toLog, "Testing offset > #kvs"); 613 614 //offset + maxResultPerCF 615 get = new Get(ROW); 616 get.setRowOffsetPerColumnFamily(4); 617 get.setMaxResultsPerColumnFamily(5); 618 result = ht.get(get); 619 kvListExp = new ArrayList<>(); 620 for (int i=4; i < 9; i++) { 621 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 622 } 623 verifyResult(result, kvListExp, toLog, 624 "Testing offset + setMaxResultsPerCF"); 625 626 // Filters: ColumnRangeFilter 627 get = new Get(ROW); 628 get.setRowOffsetPerColumnFamily(1); 629 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], 630 true)); 631 result = ht.get(get); 632 kvListExp = new ArrayList<>(); 633 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 634 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 635 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 636 verifyResult(result, kvListExp, toLog, "Testing offset with CRF"); 637 638 // Insert into two more CFs for row 639 // 10 columns for CF2, 10 columns for CF1 640 for(int j=2; j > 0; j--) { 641 put = new Put(ROW); 642 for (int i=0; i < 10; i++) { 643 KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE); 644 put.add(kv); 645 } 646 ht.put(put); 647 } 648 649 get = new Get(ROW); 650 get.setRowOffsetPerColumnFamily(4); 651 get.setMaxResultsPerColumnFamily(2); 652 get.addFamily(FAMILIES[1]); 653 get.addFamily(FAMILIES[2]); 654 result = ht.get(get); 655 kvListExp = new ArrayList<>(); 656 //Exp: CF1:q4, q5, CF2: q4, q5 657 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE)); 658 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE)); 659 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE)); 660 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE)); 661 verifyResult(result, kvListExp, toLog, 662 "Testing offset + multiple CFs + maxResults"); 663 } 664 665 @Test 666 public void testScanRawDeleteFamilyVersion() throws Exception { 667 TableName tableName = name.getTableName(); 668 TEST_UTIL.createTable(tableName, FAMILY); 669 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 670 conf.set(RPC_CODEC_CONF_KEY, ""); 671 conf.set(DEFAULT_CODEC_CLASS, ""); 672 try (Connection connection = ConnectionFactory.createConnection(conf); 673 Table table = connection.getTable(tableName)) { 674 Delete delete = new Delete(ROW); 675 delete.addFamilyVersion(FAMILY, 0L); 676 table.delete(delete); 677 Scan scan = new Scan(ROW).setRaw(true); 678 ResultScanner scanner = table.getScanner(scan); 679 int count = 0; 680 while (scanner.next() != null) { 681 count++; 682 } 683 assertEquals(1, count); 684 } finally { 685 TEST_UTIL.deleteTable(tableName); 686 } 687 } 688 689 /** 690 * Test from client side for scan while the region is reopened 691 * on the same region server. 692 * 693 * @throws Exception 694 */ 695 @Test 696 public void testScanOnReopenedRegion() throws Exception { 697 final TableName tableName = name.getTableName(); 698 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2); 699 700 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 701 702 Put put; 703 Scan scan; 704 Result result; 705 ResultScanner scanner; 706 boolean toLog = false; 707 List<Cell> kvListExp; 708 709 // table: row, family, c0:0, c1:1 710 put = new Put(ROW); 711 for (int i=0; i < QUALIFIERS.length; i++) { 712 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 713 put.add(kv); 714 } 715 ht.put(put); 716 717 scan = new Scan().withStartRow(ROW); 718 scanner = ht.getScanner(scan); 719 720 HRegionLocation loc; 721 722 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 723 loc = locator.getRegionLocation(ROW); 724 } 725 HRegionInfo hri = loc.getRegionInfo(); 726 MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 727 byte[] regionName = hri.getRegionName(); 728 int i = cluster.getServerWith(regionName); 729 HRegionServer rs = cluster.getRegionServer(i); 730 LOG.info("Unassigning " + hri); 731 TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true); 732 long startTime = EnvironmentEdgeManager.currentTime(); 733 long timeOut = 10000; 734 boolean offline = false; 735 while (true) { 736 if (rs.getOnlineRegion(regionName) == null) { 737 offline = true; 738 break; 739 } 740 assertTrue("Timed out in closing the testing region", 741 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 742 } 743 assertTrue(offline); 744 LOG.info("Assigning " + hri); 745 TEST_UTIL.getAdmin().assign(hri.getRegionName()); 746 startTime = EnvironmentEdgeManager.currentTime(); 747 while (true) { 748 rs = cluster.getRegionServer(cluster.getServerWith(regionName)); 749 if (rs != null && rs.getOnlineRegion(regionName) != null) { 750 offline = false; 751 break; 752 } 753 assertTrue("Timed out in open the testing region", 754 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 755 } 756 assertFalse(offline); 757 758 // c0:0, c1:1 759 kvListExp = new ArrayList<>(); 760 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE)); 761 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE)); 762 result = scanner.next(); 763 verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); 764 } 765 766 @Test 767 public void testAsyncScannerWithSmallData() throws Exception { 768 testAsyncScanner(name.getTableName(), 769 2, 770 3, 771 10, 772 -1, 773 null); 774 } 775 776 @Test 777 public void testAsyncScannerWithManyRows() throws Exception { 778 testAsyncScanner(name.getTableName(), 779 30000, 780 1, 781 1, 782 -1, 783 null); 784 } 785 786 @Test 787 public void testAsyncScannerWithoutCaching() throws Exception { 788 testAsyncScanner(name.getTableName(), 789 5, 790 1, 791 1, 792 1, 793 (b) -> { 794 try { 795 TimeUnit.MILLISECONDS.sleep(500); 796 } catch (InterruptedException ex) { 797 } 798 }); 799 } 800 801 private void testAsyncScanner(TableName table, int rowNumber, int familyNumber, 802 int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception { 803 assert rowNumber > 0; 804 assert familyNumber > 0; 805 assert qualifierNumber > 0; 806 byte[] row = Bytes.toBytes("r"); 807 byte[] family = Bytes.toBytes("f"); 808 byte[] qualifier = Bytes.toBytes("q"); 809 byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber); 810 byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber); 811 byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber); 812 813 Table ht = TEST_UTIL.createTable(table, families); 814 815 boolean toLog = true; 816 List<Cell> kvListExp = new ArrayList<>(); 817 818 List<Put> puts = new ArrayList<>(); 819 for (byte[] r : rows) { 820 Put put = new Put(r); 821 for (byte[] f : families) { 822 for (byte[] q : qualifiers) { 823 KeyValue kv = new KeyValue(r, f, q, 1, VALUE); 824 put.add(kv); 825 kvListExp.add(kv); 826 } 827 } 828 puts.add(put); 829 if (puts.size() > 1000) { 830 ht.put(puts); 831 puts.clear(); 832 } 833 } 834 if (!puts.isEmpty()) { 835 ht.put(puts); 836 puts.clear(); 837 } 838 839 Scan scan = new Scan(); 840 scan.setAsyncPrefetch(true); 841 if (caching > 0) { 842 scan.setCaching(caching); 843 } 844 try (ResultScanner scanner = ht.getScanner(scan)) { 845 assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); 846 ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener); 847 List<Cell> kvListScan = new ArrayList<>(); 848 Result result; 849 boolean first = true; 850 int actualRows = 0; 851 while ((result = scanner.next()) != null) { 852 ++actualRows; 853 // waiting for cache. see HBASE-17376 854 if (first) { 855 TimeUnit.SECONDS.sleep(1); 856 first = false; 857 } 858 for (Cell kv : result.listCells()) { 859 kvListScan.add(kv); 860 } 861 } 862 assertEquals(rowNumber, actualRows); 863 // These cells may have different rows but it is ok. The Result#getRow 864 // isn't used in the verifyResult() 865 result = Result.create(kvListScan); 866 verifyResult(result, kvListExp, toLog, "Testing async scan"); 867 } 868 869 TEST_UTIL.deleteTable(table); 870 } 871 872 private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) { 873 int maxLength = Integer.toString(n).length(); 874 byte [][] ret = new byte[n][]; 875 for (int i = 0; i < n; i++) { 876 int length = Integer.toString(i).length(); 877 StringBuilder buf = new StringBuilder(Integer.toString(i)); 878 IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0")); 879 byte[] tail = Bytes.toBytes(buf.toString()); 880 ret[i] = Bytes.add(base, tail); 881 } 882 return ret; 883 } 884 885 static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, 886 String msg) { 887 888 LOG.info(msg); 889 LOG.info("Expected count: " + expKvList.size()); 890 LOG.info("Actual count: " + result.size()); 891 if (expKvList.isEmpty()) { 892 return; 893 } 894 895 int i = 0; 896 for (Cell kv : result.rawCells()) { 897 if (i >= expKvList.size()) { 898 break; // we will check the size later 899 } 900 901 Cell kvExp = expKvList.get(i++); 902 if (toLog) { 903 LOG.info("get kv is: " + kv.toString()); 904 LOG.info("exp kv is: " + kvExp.toString()); 905 } 906 assertTrue("Not equal", kvExp.equals(kv)); 907 } 908 909 assertEquals(expKvList.size(), result.size()); 910 } 911 912 @Test 913 public void testReadExpiredDataForRawScan() throws IOException { 914 TableName tableName = name.getTableName(); 915 long ts = System.currentTimeMillis() - 10000; 916 byte[] value = Bytes.toBytes("expired"); 917 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 918 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value)); 919 assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER)); 920 TEST_UTIL.getAdmin().modifyColumnFamily(tableName, 921 new HColumnDescriptor(FAMILY).setTimeToLive(5)); 922 try (ResultScanner scanner = table.getScanner(FAMILY)) { 923 assertNull(scanner.next()); 924 } 925 try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) { 926 assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER)); 927 assertNull(scanner.next()); 928 } 929 } 930 } 931 932 @Test 933 public void testScanWithColumnsAndFilterAndVersion() throws IOException { 934 TableName tableName = name.getTableName(); 935 long now = System.currentTimeMillis(); 936 try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) { 937 for (int i = 0; i < 4; i++) { 938 Put put = new Put(ROW); 939 put.addColumn(FAMILY, QUALIFIER, now + i, VALUE); 940 table.put(put); 941 } 942 943 Scan scan = new Scan(); 944 scan.addColumn(FAMILY, QUALIFIER); 945 scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER))); 946 scan.readVersions(3); 947 948 try (ResultScanner scanner = table.getScanner(scan)) { 949 Result result = scanner.next(); 950 assertEquals(3, result.size()); 951 } 952 } 953 } 954 955 @Test 956 public void testScanWithSameStartRowStopRow() throws IOException { 957 TableName tableName = name.getTableName(); 958 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 959 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 960 961 Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW); 962 try (ResultScanner scanner = table.getScanner(scan)) { 963 assertNull(scanner.next()); 964 } 965 966 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true); 967 try (ResultScanner scanner = table.getScanner(scan)) { 968 Result result = scanner.next(); 969 assertNotNull(result); 970 assertArrayEquals(ROW, result.getRow()); 971 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 972 assertNull(scanner.next()); 973 } 974 975 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false); 976 try (ResultScanner scanner = table.getScanner(scan)) { 977 assertNull(scanner.next()); 978 } 979 980 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false); 981 try (ResultScanner scanner = table.getScanner(scan)) { 982 assertNull(scanner.next()); 983 } 984 985 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true); 986 try (ResultScanner scanner = table.getScanner(scan)) { 987 assertNull(scanner.next()); 988 } 989 } 990 } 991 992 @Test 993 public void testReverseScanWithFlush() throws Exception { 994 TableName tableName = name.getTableName(); 995 final int BATCH_SIZE = 10; 996 final int ROWS_TO_INSERT = 100; 997 final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); 998 999 try (Table table = TEST_UTIL.createTable(tableName, FAMILY); 1000 Admin admin = TEST_UTIL.getAdmin()) { 1001 List<Put> putList = new ArrayList<>(); 1002 for (long i = 0; i < ROWS_TO_INSERT; i++) { 1003 Put put = new Put(Bytes.toBytes(i)); 1004 put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE); 1005 putList.add(put); 1006 1007 if (putList.size() >= BATCH_SIZE) { 1008 table.put(putList); 1009 admin.flush(tableName); 1010 putList.clear(); 1011 } 1012 } 1013 1014 if (!putList.isEmpty()) { 1015 table.put(putList); 1016 admin.flush(tableName); 1017 putList.clear(); 1018 } 1019 1020 Scan scan = new Scan(); 1021 scan.setReversed(true); 1022 int count = 0; 1023 1024 try (ResultScanner results = table.getScanner(scan)) { 1025 for (Result result : results) { 1026 count++; 1027 } 1028 } 1029 assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count, 1030 ROWS_TO_INSERT, count); 1031 } 1032 } 1033 1034 @Test 1035 public void testScannerWithPartialResults() throws Exception { 1036 TableName tableName = TableName.valueOf("testScannerWithPartialResults"); 1037 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, 1038 Bytes.toBytes("c"), 4)) { 1039 List<Put> puts = new ArrayList<>(); 1040 byte[] largeArray = new byte[10000]; 1041 Put put = new Put(Bytes.toBytes("aaaa0")); 1042 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 1043 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2")); 1044 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3")); 1045 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4")); 1046 puts.add(put); 1047 put = new Put(Bytes.toBytes("aaaa1")); 1048 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 1049 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray); 1050 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray); 1051 puts.add(put); 1052 table.put(puts); 1053 Scan scan = new Scan(); 1054 scan.addFamily(Bytes.toBytes("c")); 1055 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName()); 1056 scan.setMaxResultSize(10001); 1057 scan.setStopRow(Bytes.toBytes("bbbb")); 1058 scan.setFilter(new LimitKVsReturnFilter()); 1059 ResultScanner rs = table.getScanner(scan); 1060 Result result; 1061 int expectedKvNumber = 6; 1062 int returnedKvNumber = 0; 1063 while((result = rs.next()) != null) { 1064 returnedKvNumber += result.listCells().size(); 1065 } 1066 rs.close(); 1067 assertEquals(expectedKvNumber, returnedKvNumber); 1068 } 1069 } 1070 1071 public static class LimitKVsReturnFilter extends FilterBase { 1072 1073 private int cellCount = 0; 1074 1075 @Override 1076 public ReturnCode filterCell(Cell v) throws IOException { 1077 if (cellCount >= 6) { 1078 cellCount++; 1079 return ReturnCode.SKIP; 1080 } 1081 cellCount++; 1082 return ReturnCode.INCLUDE; 1083 } 1084 1085 @Override 1086 public boolean filterAllRemaining() throws IOException { 1087 if (cellCount < 7) { 1088 return false; 1089 } 1090 cellCount++; 1091 return true; 1092 } 1093 1094 @Override 1095 public String toString() { 1096 return this.getClass().getSimpleName(); 1097 } 1098 1099 public static LimitKVsReturnFilter parseFrom(final byte [] pbBytes) 1100 throws DeserializationException { 1101 return new LimitKVsReturnFilter(); 1102 } 1103 } 1104}