001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY; 021import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue; 022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS; 023import static org.junit.Assert.assertArrayEquals; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertNull; 028import static org.junit.Assert.assertTrue; 029import static org.junit.Assert.fail; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collection; 035import java.util.List; 036import java.util.concurrent.TimeUnit; 037import java.util.function.Consumer; 038import java.util.stream.IntStream; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CompareOperator; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HBaseTestingUtility; 044import org.apache.hadoop.hbase.HColumnDescriptor; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HRegionInfo; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.HTestConst; 049import org.apache.hadoop.hbase.KeyValue; 050import org.apache.hadoop.hbase.MiniHBaseCluster; 051import org.apache.hadoop.hbase.StartMiniClusterOption; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.TableNameTestRule; 054import org.apache.hadoop.hbase.TableNotFoundException; 055import org.apache.hadoop.hbase.exceptions.DeserializationException; 056import org.apache.hadoop.hbase.filter.BinaryComparator; 057import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 058import org.apache.hadoop.hbase.filter.ColumnRangeFilter; 059import org.apache.hadoop.hbase.filter.FilterBase; 060import org.apache.hadoop.hbase.filter.QualifierFilter; 061import org.apache.hadoop.hbase.regionserver.HRegionServer; 062import org.apache.hadoop.hbase.testclassification.ClientTests; 063import org.apache.hadoop.hbase.testclassification.MediumTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.junit.AfterClass; 067import org.junit.Before; 068import org.junit.ClassRule; 069import org.junit.Rule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.junit.runner.RunWith; 073import org.junit.runners.Parameterized; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 078 079/** 080 * A client-side test, mostly testing scanners with various parameters. Parameterized on different 081 * registry implementations. 082 */ 083@Category({ MediumTests.class, ClientTests.class }) 084@RunWith(Parameterized.class) 085public class TestScannersFromClientSide { 086 087 @ClassRule 088 public static final HBaseClassTestRule CLASS_RULE = 089 HBaseClassTestRule.forClass(TestScannersFromClientSide.class); 090 091 private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class); 092 093 private static HBaseTestingUtility TEST_UTIL; 094 private static byte[] ROW = Bytes.toBytes("testRow"); 095 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 096 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 097 private static byte[] VALUE = Bytes.toBytes("testValue"); 098 099 @Rule 100 public TableNameTestRule name = new TableNameTestRule(); 101 102 /** 103 * @throws java.lang.Exception 104 */ 105 @AfterClass 106 public static void tearDownAfterClass() throws Exception { 107 if (TEST_UTIL != null) { 108 TEST_UTIL.shutdownMiniCluster(); 109 } 110 } 111 112 /** 113 * @throws java.lang.Exception 114 */ 115 @Before 116 public void setUp() throws Exception { 117 // Nothing to do. 118 } 119 120 @Parameterized.Parameters 121 public static Collection<Object[]> parameters() { 122 return Arrays.asList(new Object[][] { { MasterRegistry.class, 1 }, { MasterRegistry.class, 2 }, 123 { ZKConnectionRegistry.class, 1 } }); 124 } 125 126 /** 127 * JUnit does not provide an easy way to run a hook after each parameterized run. Without that 128 * there is no easy way to restart the test cluster after each parameterized run. Annotation 129 * BeforeParam does not work either because it runs before parameterization and hence does not 130 * have access to the test parameters (which is weird). This *hack* checks if the current instance 131 * of test cluster configuration has the passed parameterized configs. In such a case, we can just 132 * reuse the cluster for test and do not need to initialize from scratch. While this is a hack, it 133 * saves a ton of time for the full test and de-flakes it. 134 */ 135 private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) { 136 // initialize() is called for every unit test, however we only want to reset the cluster state 137 // at the end of every parameterized run. 138 if (TEST_UTIL == null) { 139 return false; 140 } 141 Configuration conf = TEST_UTIL.getConfiguration(); 142 Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 143 ZKConnectionRegistry.class); 144 int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 145 AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT); 146 return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig; 147 } 148 149 public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception { 150 if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) { 151 return; 152 } 153 if (TEST_UTIL != null) { 154 // We reached the end of a parameterized run, clean up the cluster. 155 TEST_UTIL.shutdownMiniCluster(); 156 } 157 TEST_UTIL = new HBaseTestingUtility(); 158 Configuration conf = TEST_UTIL.getConfiguration(); 159 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); 160 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl, 161 ConnectionRegistry.class); 162 Preconditions.checkArgument(numHedgedReqs > 0); 163 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); 164 StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder(); 165 // Multiple masters needed only when hedged reads for master registry are enabled. 166 builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3); 167 TEST_UTIL.startMiniCluster(builder.build()); 168 } 169 170 /** 171 * Test from client side for batch of scan n 172 */ 173 @Test 174 public void testScanBatch() throws Exception { 175 final TableName tableName = name.getTableName(); 176 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); 177 178 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 179 180 Put put; 181 Scan scan; 182 Delete delete; 183 Result result; 184 ResultScanner scanner; 185 boolean toLog = true; 186 List<Cell> kvListExp; 187 188 // table: row, family, c0:0, c1:1, ... , c7:7 189 put = new Put(ROW); 190 for (int i = 0; i < QUALIFIERS.length; i++) { 191 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 192 put.add(kv); 193 } 194 ht.put(put); 195 196 // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7 197 put = new Put(ROW); 198 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE); 199 put.add(kv); 200 ht.put(put); 201 202 // delete upto ts: 3 203 delete = new Delete(ROW); 204 delete.addFamily(FAMILY, 3); 205 ht.delete(delete); 206 207 // without batch 208 scan = new Scan().withStartRow(ROW); 209 scan.setMaxVersions(); 210 scanner = ht.getScanner(scan); 211 212 // c4:4, c5:5, c6:6, c7:7 213 kvListExp = new ArrayList<>(); 214 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 215 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 216 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 217 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 218 result = scanner.next(); 219 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 220 221 // with batch 222 scan = new Scan().withStartRow(ROW); 223 scan.setMaxVersions(); 224 scan.setBatch(2); 225 scanner = ht.getScanner(scan); 226 227 // First batch: c4:4, c5:5 228 kvListExp = new ArrayList<>(); 229 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 230 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 231 result = scanner.next(); 232 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 233 234 // Second batch: c6:6, c7:7 235 kvListExp = new ArrayList<>(); 236 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 237 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 238 result = scanner.next(); 239 verifyResult(result, kvListExp, toLog, "Testing second batch of scan"); 240 241 } 242 243 @Test 244 public void testMaxResultSizeIsSetToDefault() throws Exception { 245 final TableName tableName = name.getTableName(); 246 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 247 248 // The max result size we expect the scan to use by default. 249 long expectedMaxResultSize = 250 TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 251 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 252 253 int numRows = 5; 254 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 255 256 int numQualifiers = 10; 257 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 258 259 // Specify the cell size such that a single row will be larger than the default 260 // value of maxResultSize. This means that Scan RPCs should return at most a single 261 // result back to the client. 262 int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1)); 263 byte[] cellValue = Bytes.createMaxByteArray(cellSize); 264 265 Put put; 266 List<Put> puts = new ArrayList<>(); 267 for (int row = 0; row < ROWS.length; row++) { 268 put = new Put(ROWS[row]); 269 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 270 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue); 271 put.add(kv); 272 } 273 puts.add(put); 274 } 275 ht.put(puts); 276 277 // Create a scan with the default configuration. 278 Scan scan = new Scan(); 279 280 ResultScanner scanner = ht.getScanner(scan); 281 assertTrue(scanner instanceof ClientScanner); 282 ClientScanner clientScanner = (ClientScanner) scanner; 283 284 // Call next to issue a single RPC to the server 285 scanner.next(); 286 287 // The scanner should have, at most, a single result in its cache. If there more results exists 288 // in the cache it means that more than the expected max result size was fetched. 289 assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results", 290 clientScanner.getCacheSize() <= 1); 291 } 292 293 /** 294 * Scan on not existing table should throw the exception with correct message 295 */ 296 @Test 297 public void testScannerForNotExistingTable() { 298 String[] tableNames = { "A", "Z", "A:A", "Z:Z" }; 299 for (String tableName : tableNames) { 300 try { 301 Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName)); 302 testSmallScan(table, true, 1, 5); 303 fail("TableNotFoundException was not thrown"); 304 } catch (TableNotFoundException e) { 305 // We expect that the message for TableNotFoundException would have only the table name only 306 // Otherwise that would mean that localeRegionInMeta doesn't work properly 307 assertEquals(e.getMessage(), tableName); 308 } catch (Exception e) { 309 fail("Unexpected exception " + e.getMessage()); 310 } 311 } 312 } 313 314 @Test 315 public void testSmallScan() throws Exception { 316 final TableName tableName = name.getTableName(); 317 318 int numRows = 10; 319 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 320 321 int numQualifiers = 10; 322 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 323 324 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 325 326 Put put; 327 List<Put> puts = new ArrayList<>(); 328 for (int row = 0; row < ROWS.length; row++) { 329 put = new Put(ROWS[row]); 330 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 331 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE); 332 put.add(kv); 333 } 334 puts.add(put); 335 } 336 ht.put(puts); 337 338 int expectedRows = numRows; 339 int expectedCols = numRows * numQualifiers; 340 341 // Test normal and reversed 342 testSmallScan(ht, true, expectedRows, expectedCols); 343 testSmallScan(ht, false, expectedRows, expectedCols); 344 } 345 346 /** 347 * Run through a variety of test configurations with a small scan nnnnn 348 */ 349 private void testSmallScan(Table table, boolean reversed, int rows, int columns) 350 throws Exception { 351 Scan baseScan = new Scan(); 352 baseScan.setReversed(reversed); 353 baseScan.setSmall(true); 354 355 Scan scan = new Scan(baseScan); 356 verifyExpectedCounts(table, scan, rows, columns); 357 358 scan = new Scan(baseScan); 359 scan.setMaxResultSize(1); 360 verifyExpectedCounts(table, scan, rows, columns); 361 362 scan = new Scan(baseScan); 363 scan.setMaxResultSize(1); 364 scan.setCaching(Integer.MAX_VALUE); 365 verifyExpectedCounts(table, scan, rows, columns); 366 } 367 368 private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount, 369 int expectedCellCount) throws Exception { 370 ResultScanner scanner = table.getScanner(scan); 371 372 int rowCount = 0; 373 int cellCount = 0; 374 Result r = null; 375 while ((r = scanner.next()) != null) { 376 rowCount++; 377 cellCount += r.rawCells().length; 378 } 379 380 assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, 381 expectedRowCount == rowCount); 382 assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount, 383 expectedCellCount == cellCount); 384 scanner.close(); 385 } 386 387 /** 388 * Test from client side for get with maxResultPerCF set n 389 */ 390 @Test 391 public void testGetMaxResults() throws Exception { 392 final TableName tableName = name.getTableName(); 393 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 394 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 395 396 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 397 398 Get get; 399 Put put; 400 Result result; 401 boolean toLog = true; 402 List<Cell> kvListExp; 403 404 kvListExp = new ArrayList<>(); 405 // Insert one CF for row[0] 406 put = new Put(ROW); 407 for (int i = 0; i < 10; i++) { 408 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 409 put.add(kv); 410 kvListExp.add(kv); 411 } 412 ht.put(put); 413 414 get = new Get(ROW); 415 result = ht.get(get); 416 verifyResult(result, kvListExp, toLog, "Testing without setting maxResults"); 417 418 get = new Get(ROW); 419 get.setMaxResultsPerColumnFamily(2); 420 result = ht.get(get); 421 kvListExp = new ArrayList<>(); 422 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE)); 423 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 424 verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults"); 425 426 // Filters: ColumnRangeFilter 427 get = new Get(ROW); 428 get.setMaxResultsPerColumnFamily(5); 429 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true)); 430 result = ht.get(get); 431 kvListExp = new ArrayList<>(); 432 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE)); 433 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 434 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 435 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 436 verifyResult(result, kvListExp, toLog, "Testing single CF with CRF"); 437 438 // Insert two more CF for row[0] 439 // 20 columns for CF2, 10 columns for CF1 440 put = new Put(ROW); 441 for (int i = 0; i < QUALIFIERS.length; i++) { 442 KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE); 443 put.add(kv); 444 } 445 ht.put(put); 446 447 put = new Put(ROW); 448 for (int i = 0; i < 10; i++) { 449 KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE); 450 put.add(kv); 451 } 452 ht.put(put); 453 454 get = new Get(ROW); 455 get.setMaxResultsPerColumnFamily(12); 456 get.addFamily(FAMILIES[1]); 457 get.addFamily(FAMILIES[2]); 458 result = ht.get(get); 459 kvListExp = new ArrayList<>(); 460 // Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19 461 for (int i = 0; i < 10; i++) { 462 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 463 } 464 for (int i = 0; i < 2; i++) { 465 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 466 } 467 for (int i = 10; i < 20; i++) { 468 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 469 } 470 verifyResult(result, kvListExp, toLog, "Testing multiple CFs"); 471 472 // Filters: ColumnRangeFilter and ColumnPrefixFilter 473 get = new Get(ROW); 474 get.setMaxResultsPerColumnFamily(3); 475 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true)); 476 result = ht.get(get); 477 kvListExp = new ArrayList<>(); 478 for (int i = 2; i < 5; i++) { 479 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 480 } 481 for (int i = 2; i < 5; i++) { 482 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 483 } 484 for (int i = 2; i < 5; i++) { 485 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 486 } 487 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF"); 488 489 get = new Get(ROW); 490 get.setMaxResultsPerColumnFamily(7); 491 get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1])); 492 result = ht.get(get); 493 kvListExp = new ArrayList<>(); 494 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 495 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE)); 496 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE)); 497 for (int i = 10; i < 16; i++) { 498 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 499 } 500 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF"); 501 502 } 503 504 /** 505 * Test from client side for scan with maxResultPerCF set n 506 */ 507 @Test 508 public void testScanMaxResults() throws Exception { 509 final TableName tableName = name.getTableName(); 510 byte[][] ROWS = HTestConst.makeNAscii(ROW, 2); 511 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 512 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); 513 514 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 515 516 Put put; 517 Scan scan; 518 Result result; 519 boolean toLog = true; 520 List<Cell> kvListExp, kvListScan; 521 522 kvListExp = new ArrayList<>(); 523 524 for (int r = 0; r < ROWS.length; r++) { 525 put = new Put(ROWS[r]); 526 for (int c = 0; c < FAMILIES.length; c++) { 527 for (int q = 0; q < QUALIFIERS.length; q++) { 528 KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); 529 put.add(kv); 530 if (q < 4) { 531 kvListExp.add(kv); 532 } 533 } 534 } 535 ht.put(put); 536 } 537 538 scan = new Scan(); 539 scan.setMaxResultsPerColumnFamily(4); 540 ResultScanner scanner = ht.getScanner(scan); 541 kvListScan = new ArrayList<>(); 542 while ((result = scanner.next()) != null) { 543 for (Cell kv : result.listCells()) { 544 kvListScan.add(kv); 545 } 546 } 547 result = Result.create(kvListScan); 548 verifyResult(result, kvListExp, toLog, "Testing scan with maxResults"); 549 550 } 551 552 /** 553 * Test from client side for get with rowOffset n 554 */ 555 @Test 556 public void testGetRowOffset() throws Exception { 557 final TableName tableName = name.getTableName(); 558 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 559 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 560 561 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 562 563 Get get; 564 Put put; 565 Result result; 566 boolean toLog = true; 567 List<Cell> kvListExp; 568 569 // Insert one CF for row 570 kvListExp = new ArrayList<>(); 571 put = new Put(ROW); 572 for (int i = 0; i < 10; i++) { 573 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 574 put.add(kv); 575 // skipping first two kvs 576 if (i < 2) { 577 continue; 578 } 579 kvListExp.add(kv); 580 } 581 ht.put(put); 582 583 // setting offset to 2 584 get = new Get(ROW); 585 get.setRowOffsetPerColumnFamily(2); 586 result = ht.get(get); 587 verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset"); 588 589 // setting offset to 20 590 get = new Get(ROW); 591 get.setRowOffsetPerColumnFamily(20); 592 result = ht.get(get); 593 kvListExp = new ArrayList<>(); 594 verifyResult(result, kvListExp, toLog, "Testing offset > #kvs"); 595 596 // offset + maxResultPerCF 597 get = new Get(ROW); 598 get.setRowOffsetPerColumnFamily(4); 599 get.setMaxResultsPerColumnFamily(5); 600 result = ht.get(get); 601 kvListExp = new ArrayList<>(); 602 for (int i = 4; i < 9; i++) { 603 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 604 } 605 verifyResult(result, kvListExp, toLog, "Testing offset + setMaxResultsPerCF"); 606 607 // Filters: ColumnRangeFilter 608 get = new Get(ROW); 609 get.setRowOffsetPerColumnFamily(1); 610 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true)); 611 result = ht.get(get); 612 kvListExp = new ArrayList<>(); 613 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 614 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 615 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 616 verifyResult(result, kvListExp, toLog, "Testing offset with CRF"); 617 618 // Insert into two more CFs for row 619 // 10 columns for CF2, 10 columns for CF1 620 for (int j = 2; j > 0; j--) { 621 put = new Put(ROW); 622 for (int i = 0; i < 10; i++) { 623 KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE); 624 put.add(kv); 625 } 626 ht.put(put); 627 } 628 629 get = new Get(ROW); 630 get.setRowOffsetPerColumnFamily(4); 631 get.setMaxResultsPerColumnFamily(2); 632 get.addFamily(FAMILIES[1]); 633 get.addFamily(FAMILIES[2]); 634 result = ht.get(get); 635 kvListExp = new ArrayList<>(); 636 // Exp: CF1:q4, q5, CF2: q4, q5 637 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE)); 638 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE)); 639 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE)); 640 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE)); 641 verifyResult(result, kvListExp, toLog, "Testing offset + multiple CFs + maxResults"); 642 } 643 644 @Test 645 public void testScanRawDeleteFamilyVersion() throws Exception { 646 TableName tableName = name.getTableName(); 647 TEST_UTIL.createTable(tableName, FAMILY); 648 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 649 conf.set(RPC_CODEC_CONF_KEY, ""); 650 conf.set(DEFAULT_CODEC_CLASS, ""); 651 try (Connection connection = ConnectionFactory.createConnection(conf); 652 Table table = connection.getTable(tableName)) { 653 Delete delete = new Delete(ROW); 654 delete.addFamilyVersion(FAMILY, 0L); 655 table.delete(delete); 656 Scan scan = new Scan(ROW).setRaw(true); 657 ResultScanner scanner = table.getScanner(scan); 658 int count = 0; 659 while (scanner.next() != null) { 660 count++; 661 } 662 assertEquals(1, count); 663 } finally { 664 TEST_UTIL.deleteTable(tableName); 665 } 666 } 667 668 /** 669 * Test from client side for scan while the region is reopened on the same region server. n 670 */ 671 @Test 672 public void testScanOnReopenedRegion() throws Exception { 673 final TableName tableName = name.getTableName(); 674 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2); 675 676 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 677 678 Put put; 679 Scan scan; 680 Result result; 681 ResultScanner scanner; 682 boolean toLog = false; 683 List<Cell> kvListExp; 684 685 // table: row, family, c0:0, c1:1 686 put = new Put(ROW); 687 for (int i = 0; i < QUALIFIERS.length; i++) { 688 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 689 put.add(kv); 690 } 691 ht.put(put); 692 693 scan = new Scan().withStartRow(ROW); 694 scanner = ht.getScanner(scan); 695 696 HRegionLocation loc; 697 698 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 699 loc = locator.getRegionLocation(ROW); 700 } 701 HRegionInfo hri = loc.getRegionInfo(); 702 MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 703 byte[] regionName = hri.getRegionName(); 704 int i = cluster.getServerWith(regionName); 705 HRegionServer rs = cluster.getRegionServer(i); 706 LOG.info("Unassigning " + hri); 707 TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true); 708 long startTime = EnvironmentEdgeManager.currentTime(); 709 long timeOut = 10000; 710 boolean offline = false; 711 while (true) { 712 if (rs.getOnlineRegion(regionName) == null) { 713 offline = true; 714 break; 715 } 716 assertTrue("Timed out in closing the testing region", 717 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 718 } 719 assertTrue(offline); 720 LOG.info("Assigning " + hri); 721 TEST_UTIL.getAdmin().assign(hri.getRegionName()); 722 startTime = EnvironmentEdgeManager.currentTime(); 723 while (true) { 724 rs = cluster.getRegionServer(cluster.getServerWith(regionName)); 725 if (rs != null && rs.getOnlineRegion(regionName) != null) { 726 offline = false; 727 break; 728 } 729 assertTrue("Timed out in open the testing region", 730 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 731 } 732 assertFalse(offline); 733 734 // c0:0, c1:1 735 kvListExp = new ArrayList<>(); 736 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE)); 737 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE)); 738 result = scanner.next(); 739 verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); 740 } 741 742 @Test 743 public void testAsyncScannerWithSmallData() throws Exception { 744 testAsyncScanner(name.getTableName(), 2, 3, 10, -1, null); 745 } 746 747 @Test 748 public void testAsyncScannerWithManyRows() throws Exception { 749 testAsyncScanner(name.getTableName(), 30000, 1, 1, -1, null); 750 } 751 752 @Test 753 public void testAsyncScannerWithoutCaching() throws Exception { 754 testAsyncScanner(name.getTableName(), 5, 1, 1, 1, (b) -> { 755 try { 756 TimeUnit.MILLISECONDS.sleep(500); 757 } catch (InterruptedException ex) { 758 } 759 }); 760 } 761 762 private void testAsyncScanner(TableName table, int rowNumber, int familyNumber, 763 int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception { 764 assert rowNumber > 0; 765 assert familyNumber > 0; 766 assert qualifierNumber > 0; 767 byte[] row = Bytes.toBytes("r"); 768 byte[] family = Bytes.toBytes("f"); 769 byte[] qualifier = Bytes.toBytes("q"); 770 byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber); 771 byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber); 772 byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber); 773 774 Table ht = TEST_UTIL.createTable(table, families); 775 776 boolean toLog = true; 777 List<Cell> kvListExp = new ArrayList<>(); 778 779 List<Put> puts = new ArrayList<>(); 780 for (byte[] r : rows) { 781 Put put = new Put(r); 782 for (byte[] f : families) { 783 for (byte[] q : qualifiers) { 784 KeyValue kv = new KeyValue(r, f, q, 1, VALUE); 785 put.add(kv); 786 kvListExp.add(kv); 787 } 788 } 789 puts.add(put); 790 if (puts.size() > 1000) { 791 ht.put(puts); 792 puts.clear(); 793 } 794 } 795 if (!puts.isEmpty()) { 796 ht.put(puts); 797 puts.clear(); 798 } 799 800 Scan scan = new Scan(); 801 scan.setAsyncPrefetch(true); 802 if (caching > 0) { 803 scan.setCaching(caching); 804 } 805 try (ResultScanner scanner = ht.getScanner(scan)) { 806 assertTrue("Not instance of async scanner", scanner instanceof ClientAsyncPrefetchScanner); 807 ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener); 808 List<Cell> kvListScan = new ArrayList<>(); 809 Result result; 810 boolean first = true; 811 int actualRows = 0; 812 while ((result = scanner.next()) != null) { 813 ++actualRows; 814 // waiting for cache. see HBASE-17376 815 if (first) { 816 TimeUnit.SECONDS.sleep(1); 817 first = false; 818 } 819 for (Cell kv : result.listCells()) { 820 kvListScan.add(kv); 821 } 822 } 823 assertEquals(rowNumber, actualRows); 824 // These cells may have different rows but it is ok. The Result#getRow 825 // isn't used in the verifyResult() 826 result = Result.create(kvListScan); 827 verifyResult(result, kvListExp, toLog, "Testing async scan"); 828 } 829 830 TEST_UTIL.deleteTable(table); 831 } 832 833 private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) { 834 int maxLength = Integer.toString(n).length(); 835 byte[][] ret = new byte[n][]; 836 for (int i = 0; i < n; i++) { 837 int length = Integer.toString(i).length(); 838 StringBuilder buf = new StringBuilder(Integer.toString(i)); 839 IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0")); 840 byte[] tail = Bytes.toBytes(buf.toString()); 841 ret[i] = Bytes.add(base, tail); 842 } 843 return ret; 844 } 845 846 static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, String msg) { 847 848 LOG.info(msg); 849 LOG.info("Expected count: " + expKvList.size()); 850 LOG.info("Actual count: " + result.size()); 851 if (expKvList.isEmpty()) { 852 return; 853 } 854 855 int i = 0; 856 for (Cell kv : result.rawCells()) { 857 if (i >= expKvList.size()) { 858 break; // we will check the size later 859 } 860 861 Cell kvExp = expKvList.get(i++); 862 if (toLog) { 863 LOG.info("get kv is: " + kv.toString()); 864 LOG.info("exp kv is: " + kvExp.toString()); 865 } 866 assertTrue("Not equal", kvExp.equals(kv)); 867 } 868 869 assertEquals(expKvList.size(), result.size()); 870 } 871 872 @Test 873 public void testReadExpiredDataForRawScan() throws IOException { 874 TableName tableName = name.getTableName(); 875 long ts = EnvironmentEdgeManager.currentTime() - 10000; 876 byte[] value = Bytes.toBytes("expired"); 877 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 878 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value)); 879 assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER)); 880 TEST_UTIL.getAdmin().modifyColumnFamily(tableName, 881 new HColumnDescriptor(FAMILY).setTimeToLive(5)); 882 try (ResultScanner scanner = table.getScanner(FAMILY)) { 883 assertNull(scanner.next()); 884 } 885 try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) { 886 assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER)); 887 assertNull(scanner.next()); 888 } 889 } 890 } 891 892 @Test 893 public void testScanWithColumnsAndFilterAndVersion() throws IOException { 894 TableName tableName = name.getTableName(); 895 long now = EnvironmentEdgeManager.currentTime(); 896 try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) { 897 for (int i = 0; i < 4; i++) { 898 Put put = new Put(ROW); 899 put.addColumn(FAMILY, QUALIFIER, now + i, VALUE); 900 table.put(put); 901 } 902 903 Scan scan = new Scan(); 904 scan.addColumn(FAMILY, QUALIFIER); 905 scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER))); 906 scan.readVersions(3); 907 908 try (ResultScanner scanner = table.getScanner(scan)) { 909 Result result = scanner.next(); 910 assertEquals(3, result.size()); 911 } 912 } 913 } 914 915 @Test 916 public void testScanWithSameStartRowStopRow() throws IOException { 917 TableName tableName = name.getTableName(); 918 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 919 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 920 921 Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW); 922 try (ResultScanner scanner = table.getScanner(scan)) { 923 assertNull(scanner.next()); 924 } 925 926 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true); 927 try (ResultScanner scanner = table.getScanner(scan)) { 928 Result result = scanner.next(); 929 assertNotNull(result); 930 assertArrayEquals(ROW, result.getRow()); 931 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 932 assertNull(scanner.next()); 933 } 934 935 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false); 936 try (ResultScanner scanner = table.getScanner(scan)) { 937 assertNull(scanner.next()); 938 } 939 940 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false); 941 try (ResultScanner scanner = table.getScanner(scan)) { 942 assertNull(scanner.next()); 943 } 944 945 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true); 946 try (ResultScanner scanner = table.getScanner(scan)) { 947 assertNull(scanner.next()); 948 } 949 } 950 } 951 952 @Test 953 public void testReverseScanWithFlush() throws Exception { 954 TableName tableName = name.getTableName(); 955 final int BATCH_SIZE = 10; 956 final int ROWS_TO_INSERT = 100; 957 final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); 958 959 try (Table table = TEST_UTIL.createTable(tableName, FAMILY); 960 Admin admin = TEST_UTIL.getAdmin()) { 961 List<Put> putList = new ArrayList<>(); 962 for (long i = 0; i < ROWS_TO_INSERT; i++) { 963 Put put = new Put(Bytes.toBytes(i)); 964 put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE); 965 putList.add(put); 966 967 if (putList.size() >= BATCH_SIZE) { 968 table.put(putList); 969 admin.flush(tableName); 970 putList.clear(); 971 } 972 } 973 974 if (!putList.isEmpty()) { 975 table.put(putList); 976 admin.flush(tableName); 977 putList.clear(); 978 } 979 980 Scan scan = new Scan(); 981 scan.setReversed(true); 982 int count = 0; 983 984 try (ResultScanner results = table.getScanner(scan)) { 985 for (Result result : results) { 986 count++; 987 } 988 } 989 assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count, 990 ROWS_TO_INSERT, count); 991 } 992 } 993 994 @Test 995 public void testScannerWithPartialResults() throws Exception { 996 TableName tableName = TableName.valueOf("testScannerWithPartialResults"); 997 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, Bytes.toBytes("c"), 4)) { 998 List<Put> puts = new ArrayList<>(); 999 byte[] largeArray = new byte[10000]; 1000 Put put = new Put(Bytes.toBytes("aaaa0")); 1001 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 1002 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2")); 1003 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3")); 1004 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4")); 1005 puts.add(put); 1006 put = new Put(Bytes.toBytes("aaaa1")); 1007 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 1008 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray); 1009 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray); 1010 puts.add(put); 1011 table.put(puts); 1012 Scan scan = new Scan(); 1013 scan.addFamily(Bytes.toBytes("c")); 1014 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName()); 1015 scan.setMaxResultSize(10001); 1016 scan.setStopRow(Bytes.toBytes("bbbb")); 1017 scan.setFilter(new LimitKVsReturnFilter()); 1018 ResultScanner rs = table.getScanner(scan); 1019 Result result; 1020 int expectedKvNumber = 6; 1021 int returnedKvNumber = 0; 1022 while ((result = rs.next()) != null) { 1023 returnedKvNumber += result.listCells().size(); 1024 } 1025 rs.close(); 1026 assertEquals(expectedKvNumber, returnedKvNumber); 1027 } 1028 } 1029 1030 public static class LimitKVsReturnFilter extends FilterBase { 1031 1032 private int cellCount = 0; 1033 1034 @Override 1035 public ReturnCode filterCell(Cell v) throws IOException { 1036 if (cellCount >= 6) { 1037 cellCount++; 1038 return ReturnCode.SKIP; 1039 } 1040 cellCount++; 1041 return ReturnCode.INCLUDE; 1042 } 1043 1044 @Override 1045 public boolean filterAllRemaining() throws IOException { 1046 if (cellCount < 7) { 1047 return false; 1048 } 1049 cellCount++; 1050 return true; 1051 } 1052 1053 @Override 1054 public String toString() { 1055 return this.getClass().getSimpleName(); 1056 } 1057 1058 public static LimitKVsReturnFilter parseFrom(final byte[] pbBytes) 1059 throws DeserializationException { 1060 return new LimitKVsReturnFilter(); 1061 } 1062 } 1063}