001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY; 021import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue; 022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS; 023import static org.junit.Assert.assertArrayEquals; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertNull; 028import static org.junit.Assert.assertTrue; 029import static org.junit.Assert.fail; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.concurrent.TimeUnit; 035import java.util.function.Consumer; 036import java.util.stream.IntStream; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CompareOperator; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HColumnDescriptor; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.HRegionInfo; 045import org.apache.hadoop.hbase.HRegionLocation; 046import org.apache.hadoop.hbase.HTestConst; 047import org.apache.hadoop.hbase.KeyValue; 048import org.apache.hadoop.hbase.MiniHBaseCluster; 049import org.apache.hadoop.hbase.exceptions.DeserializationException; 050import org.apache.hadoop.hbase.filter.FilterBase; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.TableNotFoundException; 054import org.apache.hadoop.hbase.filter.BinaryComparator; 055import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 056import org.apache.hadoop.hbase.filter.ColumnRangeFilter; 057import org.apache.hadoop.hbase.filter.QualifierFilter; 058import org.apache.hadoop.hbase.regionserver.HRegionServer; 059import org.apache.hadoop.hbase.testclassification.ClientTests; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.junit.After; 063import org.junit.AfterClass; 064import org.junit.Before; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.rules.TestName; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074/** 075 * A client-side test, mostly testing scanners with various parameters. 076 */ 077@Category({MediumTests.class, ClientTests.class}) 078public class TestScannersFromClientSide { 079 080 @ClassRule 081 public static final HBaseClassTestRule CLASS_RULE = 082 HBaseClassTestRule.forClass(TestScannersFromClientSide.class); 083 084 private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class); 085 086 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 087 private static byte [] ROW = Bytes.toBytes("testRow"); 088 private static byte [] FAMILY = Bytes.toBytes("testFamily"); 089 private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); 090 private static byte [] VALUE = Bytes.toBytes("testValue"); 091 092 @Rule 093 public TestName name = new TestName(); 094 095 /** 096 * @throws java.lang.Exception 097 */ 098 @BeforeClass 099 public static void setUpBeforeClass() throws Exception { 100 Configuration conf = TEST_UTIL.getConfiguration(); 101 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); 102 TEST_UTIL.startMiniCluster(3); 103 } 104 105 /** 106 * @throws java.lang.Exception 107 */ 108 @AfterClass 109 public static void tearDownAfterClass() throws Exception { 110 TEST_UTIL.shutdownMiniCluster(); 111 } 112 113 /** 114 * @throws java.lang.Exception 115 */ 116 @Before 117 public void setUp() throws Exception { 118 // Nothing to do. 119 } 120 121 /** 122 * @throws java.lang.Exception 123 */ 124 @After 125 public void tearDown() throws Exception { 126 // Nothing to do. 127 } 128 129 /** 130 * Test from client side for batch of scan 131 * 132 * @throws Exception 133 */ 134 @Test 135 public void testScanBatch() throws Exception { 136 final TableName tableName = TableName.valueOf(name.getMethodName()); 137 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); 138 139 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 140 141 Put put; 142 Scan scan; 143 Delete delete; 144 Result result; 145 ResultScanner scanner; 146 boolean toLog = true; 147 List<Cell> kvListExp; 148 149 // table: row, family, c0:0, c1:1, ... , c7:7 150 put = new Put(ROW); 151 for (int i=0; i < QUALIFIERS.length; i++) { 152 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 153 put.add(kv); 154 } 155 ht.put(put); 156 157 // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7 158 put = new Put(ROW); 159 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE); 160 put.add(kv); 161 ht.put(put); 162 163 // delete upto ts: 3 164 delete = new Delete(ROW); 165 delete.addFamily(FAMILY, 3); 166 ht.delete(delete); 167 168 // without batch 169 scan = new Scan().withStartRow(ROW); 170 scan.setMaxVersions(); 171 scanner = ht.getScanner(scan); 172 173 // c4:4, c5:5, c6:6, c7:7 174 kvListExp = new ArrayList<>(); 175 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 176 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 177 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 178 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 179 result = scanner.next(); 180 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 181 182 // with batch 183 scan = new Scan().withStartRow(ROW); 184 scan.setMaxVersions(); 185 scan.setBatch(2); 186 scanner = ht.getScanner(scan); 187 188 // First batch: c4:4, c5:5 189 kvListExp = new ArrayList<>(); 190 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 191 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 192 result = scanner.next(); 193 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 194 195 // Second batch: c6:6, c7:7 196 kvListExp = new ArrayList<>(); 197 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 198 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 199 result = scanner.next(); 200 verifyResult(result, kvListExp, toLog, "Testing second batch of scan"); 201 202 } 203 204 @Test 205 public void testMaxResultSizeIsSetToDefault() throws Exception { 206 final TableName tableName = TableName.valueOf(name.getMethodName()); 207 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 208 209 // The max result size we expect the scan to use by default. 210 long expectedMaxResultSize = 211 TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 212 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 213 214 int numRows = 5; 215 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 216 217 int numQualifiers = 10; 218 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 219 220 // Specify the cell size such that a single row will be larger than the default 221 // value of maxResultSize. This means that Scan RPCs should return at most a single 222 // result back to the client. 223 int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1)); 224 byte[] cellValue = Bytes.createMaxByteArray(cellSize); 225 226 Put put; 227 List<Put> puts = new ArrayList<>(); 228 for (int row = 0; row < ROWS.length; row++) { 229 put = new Put(ROWS[row]); 230 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 231 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue); 232 put.add(kv); 233 } 234 puts.add(put); 235 } 236 ht.put(puts); 237 238 // Create a scan with the default configuration. 239 Scan scan = new Scan(); 240 241 ResultScanner scanner = ht.getScanner(scan); 242 assertTrue(scanner instanceof ClientScanner); 243 ClientScanner clientScanner = (ClientScanner) scanner; 244 245 // Call next to issue a single RPC to the server 246 scanner.next(); 247 248 // The scanner should have, at most, a single result in its cache. If there more results exists 249 // in the cache it means that more than the expected max result size was fetched. 250 assertTrue("The cache contains: " + clientScanner.getCacheSize() + " results", 251 clientScanner.getCacheSize() <= 1); 252 } 253 254 /** 255 * Scan on not existing table should throw the exception with correct message 256 */ 257 @Test 258 public void testScannerForNotExistingTable() { 259 String[] tableNames = {"A", "Z", "A:A", "Z:Z"}; 260 for(String tableName : tableNames) { 261 try { 262 Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName)); 263 testSmallScan(table, true, 1, 5); 264 fail("TableNotFoundException was not thrown"); 265 } catch (TableNotFoundException e) { 266 // We expect that the message for TableNotFoundException would have only the table name only 267 // Otherwise that would mean that localeRegionInMeta doesn't work properly 268 assertEquals(e.getMessage(), tableName); 269 } catch (Exception e) { 270 fail("Unexpected exception " + e.getMessage()); 271 } 272 } 273 } 274 275 @Test 276 public void testSmallScan() throws Exception { 277 final TableName tableName = TableName.valueOf(name.getMethodName()); 278 279 int numRows = 10; 280 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 281 282 int numQualifiers = 10; 283 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 284 285 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 286 287 Put put; 288 List<Put> puts = new ArrayList<>(); 289 for (int row = 0; row < ROWS.length; row++) { 290 put = new Put(ROWS[row]); 291 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 292 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE); 293 put.add(kv); 294 } 295 puts.add(put); 296 } 297 ht.put(puts); 298 299 int expectedRows = numRows; 300 int expectedCols = numRows * numQualifiers; 301 302 // Test normal and reversed 303 testSmallScan(ht, true, expectedRows, expectedCols); 304 testSmallScan(ht, false, expectedRows, expectedCols); 305 } 306 307 /** 308 * Run through a variety of test configurations with a small scan 309 * @param table 310 * @param reversed 311 * @param rows 312 * @param columns 313 * @throws Exception 314 */ 315 private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception { 316 Scan baseScan = new Scan(); 317 baseScan.setReversed(reversed); 318 baseScan.setSmall(true); 319 320 Scan scan = new Scan(baseScan); 321 verifyExpectedCounts(table, scan, rows, columns); 322 323 scan = new Scan(baseScan); 324 scan.setMaxResultSize(1); 325 verifyExpectedCounts(table, scan, rows, columns); 326 327 scan = new Scan(baseScan); 328 scan.setMaxResultSize(1); 329 scan.setCaching(Integer.MAX_VALUE); 330 verifyExpectedCounts(table, scan, rows, columns); 331 } 332 333 private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount, 334 int expectedCellCount) throws Exception { 335 ResultScanner scanner = table.getScanner(scan); 336 337 int rowCount = 0; 338 int cellCount = 0; 339 Result r = null; 340 while ((r = scanner.next()) != null) { 341 rowCount++; 342 cellCount += r.rawCells().length; 343 } 344 345 assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, 346 expectedRowCount == rowCount); 347 assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount, 348 expectedCellCount == cellCount); 349 scanner.close(); 350 } 351 352 /** 353 * Test from client side for get with maxResultPerCF set 354 * 355 * @throws Exception 356 */ 357 @Test 358 public void testGetMaxResults() throws Exception { 359 final TableName tableName = TableName.valueOf(name.getMethodName()); 360 byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 361 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 362 363 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 364 365 Get get; 366 Put put; 367 Result result; 368 boolean toLog = true; 369 List<Cell> kvListExp; 370 371 kvListExp = new ArrayList<>(); 372 // Insert one CF for row[0] 373 put = new Put(ROW); 374 for (int i=0; i < 10; i++) { 375 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 376 put.add(kv); 377 kvListExp.add(kv); 378 } 379 ht.put(put); 380 381 get = new Get(ROW); 382 result = ht.get(get); 383 verifyResult(result, kvListExp, toLog, "Testing without setting maxResults"); 384 385 get = new Get(ROW); 386 get.setMaxResultsPerColumnFamily(2); 387 result = ht.get(get); 388 kvListExp = new ArrayList<>(); 389 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE)); 390 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 391 verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults"); 392 393 // Filters: ColumnRangeFilter 394 get = new Get(ROW); 395 get.setMaxResultsPerColumnFamily(5); 396 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], 397 true)); 398 result = ht.get(get); 399 kvListExp = new ArrayList<>(); 400 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE)); 401 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 402 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 403 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 404 verifyResult(result, kvListExp, toLog, "Testing single CF with CRF"); 405 406 // Insert two more CF for row[0] 407 // 20 columns for CF2, 10 columns for CF1 408 put = new Put(ROW); 409 for (int i=0; i < QUALIFIERS.length; i++) { 410 KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE); 411 put.add(kv); 412 } 413 ht.put(put); 414 415 put = new Put(ROW); 416 for (int i=0; i < 10; i++) { 417 KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE); 418 put.add(kv); 419 } 420 ht.put(put); 421 422 get = new Get(ROW); 423 get.setMaxResultsPerColumnFamily(12); 424 get.addFamily(FAMILIES[1]); 425 get.addFamily(FAMILIES[2]); 426 result = ht.get(get); 427 kvListExp = new ArrayList<>(); 428 //Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19 429 for (int i=0; i < 10; i++) { 430 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 431 } 432 for (int i=0; i < 2; i++) { 433 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 434 } 435 for (int i=10; i < 20; i++) { 436 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 437 } 438 verifyResult(result, kvListExp, toLog, "Testing multiple CFs"); 439 440 // Filters: ColumnRangeFilter and ColumnPrefixFilter 441 get = new Get(ROW); 442 get.setMaxResultsPerColumnFamily(3); 443 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true)); 444 result = ht.get(get); 445 kvListExp = new ArrayList<>(); 446 for (int i=2; i < 5; i++) { 447 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 448 } 449 for (int i=2; i < 5; i++) { 450 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 451 } 452 for (int i=2; i < 5; i++) { 453 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 454 } 455 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF"); 456 457 get = new Get(ROW); 458 get.setMaxResultsPerColumnFamily(7); 459 get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1])); 460 result = ht.get(get); 461 kvListExp = new ArrayList<>(); 462 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 463 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE)); 464 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE)); 465 for (int i=10; i < 16; i++) { 466 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 467 } 468 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF"); 469 470 } 471 472 /** 473 * Test from client side for scan with maxResultPerCF set 474 * 475 * @throws Exception 476 */ 477 @Test 478 public void testScanMaxResults() throws Exception { 479 final TableName tableName = TableName.valueOf(name.getMethodName()); 480 byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); 481 byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 482 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); 483 484 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 485 486 Put put; 487 Scan scan; 488 Result result; 489 boolean toLog = true; 490 List<Cell> kvListExp, kvListScan; 491 492 kvListExp = new ArrayList<>(); 493 494 for (int r=0; r < ROWS.length; r++) { 495 put = new Put(ROWS[r]); 496 for (int c=0; c < FAMILIES.length; c++) { 497 for (int q=0; q < QUALIFIERS.length; q++) { 498 KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); 499 put.add(kv); 500 if (q < 4) { 501 kvListExp.add(kv); 502 } 503 } 504 } 505 ht.put(put); 506 } 507 508 scan = new Scan(); 509 scan.setMaxResultsPerColumnFamily(4); 510 ResultScanner scanner = ht.getScanner(scan); 511 kvListScan = new ArrayList<>(); 512 while ((result = scanner.next()) != null) { 513 for (Cell kv : result.listCells()) { 514 kvListScan.add(kv); 515 } 516 } 517 result = Result.create(kvListScan); 518 verifyResult(result, kvListExp, toLog, "Testing scan with maxResults"); 519 520 } 521 522 /** 523 * Test from client side for get with rowOffset 524 * 525 * @throws Exception 526 */ 527 @Test 528 public void testGetRowOffset() throws Exception { 529 final TableName tableName = TableName.valueOf(name.getMethodName()); 530 byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 531 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 532 533 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 534 535 Get get; 536 Put put; 537 Result result; 538 boolean toLog = true; 539 List<Cell> kvListExp; 540 541 // Insert one CF for row 542 kvListExp = new ArrayList<>(); 543 put = new Put(ROW); 544 for (int i=0; i < 10; i++) { 545 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 546 put.add(kv); 547 // skipping first two kvs 548 if (i < 2) continue; 549 kvListExp.add(kv); 550 } 551 ht.put(put); 552 553 //setting offset to 2 554 get = new Get(ROW); 555 get.setRowOffsetPerColumnFamily(2); 556 result = ht.get(get); 557 verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset"); 558 559 //setting offset to 20 560 get = new Get(ROW); 561 get.setRowOffsetPerColumnFamily(20); 562 result = ht.get(get); 563 kvListExp = new ArrayList<>(); 564 verifyResult(result, kvListExp, toLog, "Testing offset > #kvs"); 565 566 //offset + maxResultPerCF 567 get = new Get(ROW); 568 get.setRowOffsetPerColumnFamily(4); 569 get.setMaxResultsPerColumnFamily(5); 570 result = ht.get(get); 571 kvListExp = new ArrayList<>(); 572 for (int i=4; i < 9; i++) { 573 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 574 } 575 verifyResult(result, kvListExp, toLog, 576 "Testing offset + setMaxResultsPerCF"); 577 578 // Filters: ColumnRangeFilter 579 get = new Get(ROW); 580 get.setRowOffsetPerColumnFamily(1); 581 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], 582 true)); 583 result = ht.get(get); 584 kvListExp = new ArrayList<>(); 585 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 586 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 587 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 588 verifyResult(result, kvListExp, toLog, "Testing offset with CRF"); 589 590 // Insert into two more CFs for row 591 // 10 columns for CF2, 10 columns for CF1 592 for(int j=2; j > 0; j--) { 593 put = new Put(ROW); 594 for (int i=0; i < 10; i++) { 595 KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE); 596 put.add(kv); 597 } 598 ht.put(put); 599 } 600 601 get = new Get(ROW); 602 get.setRowOffsetPerColumnFamily(4); 603 get.setMaxResultsPerColumnFamily(2); 604 get.addFamily(FAMILIES[1]); 605 get.addFamily(FAMILIES[2]); 606 result = ht.get(get); 607 kvListExp = new ArrayList<>(); 608 //Exp: CF1:q4, q5, CF2: q4, q5 609 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE)); 610 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE)); 611 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE)); 612 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE)); 613 verifyResult(result, kvListExp, toLog, 614 "Testing offset + multiple CFs + maxResults"); 615 } 616 617 @Test 618 public void testScanRawDeleteFamilyVersion() throws Exception { 619 TableName tableName = TableName.valueOf(name.getMethodName()); 620 TEST_UTIL.createTable(tableName, FAMILY); 621 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 622 conf.set(RPC_CODEC_CONF_KEY, ""); 623 conf.set(DEFAULT_CODEC_CLASS, ""); 624 try (Connection connection = ConnectionFactory.createConnection(conf); 625 Table table = connection.getTable(tableName)) { 626 Delete delete = new Delete(ROW); 627 delete.addFamilyVersion(FAMILY, 0L); 628 table.delete(delete); 629 Scan scan = new Scan(ROW).setRaw(true); 630 ResultScanner scanner = table.getScanner(scan); 631 int count = 0; 632 while (scanner.next() != null) { 633 count++; 634 } 635 assertEquals(1, count); 636 } finally { 637 TEST_UTIL.deleteTable(tableName); 638 } 639 } 640 641 /** 642 * Test from client side for scan while the region is reopened 643 * on the same region server. 644 * 645 * @throws Exception 646 */ 647 @Test 648 public void testScanOnReopenedRegion() throws Exception { 649 final TableName tableName = TableName.valueOf(name.getMethodName()); 650 byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2); 651 652 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 653 654 Put put; 655 Scan scan; 656 Result result; 657 ResultScanner scanner; 658 boolean toLog = false; 659 List<Cell> kvListExp; 660 661 // table: row, family, c0:0, c1:1 662 put = new Put(ROW); 663 for (int i=0; i < QUALIFIERS.length; i++) { 664 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 665 put.add(kv); 666 } 667 ht.put(put); 668 669 scan = new Scan().withStartRow(ROW); 670 scanner = ht.getScanner(scan); 671 672 HRegionLocation loc; 673 674 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 675 loc = locator.getRegionLocation(ROW); 676 } 677 HRegionInfo hri = loc.getRegionInfo(); 678 MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 679 byte[] regionName = hri.getRegionName(); 680 int i = cluster.getServerWith(regionName); 681 HRegionServer rs = cluster.getRegionServer(i); 682 LOG.info("Unassigning " + hri); 683 TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true); 684 long startTime = EnvironmentEdgeManager.currentTime(); 685 long timeOut = 10000; 686 boolean offline = false; 687 while (true) { 688 if (rs.getOnlineRegion(regionName) == null) { 689 offline = true; 690 break; 691 } 692 assertTrue("Timed out in closing the testing region", 693 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 694 } 695 assertTrue(offline); 696 LOG.info("Assigning " + hri); 697 TEST_UTIL.getAdmin().assign(hri.getRegionName()); 698 startTime = EnvironmentEdgeManager.currentTime(); 699 while (true) { 700 rs = cluster.getRegionServer(cluster.getServerWith(regionName)); 701 if (rs != null && rs.getOnlineRegion(regionName) != null) { 702 offline = false; 703 break; 704 } 705 assertTrue("Timed out in open the testing region", 706 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 707 } 708 assertFalse(offline); 709 710 // c0:0, c1:1 711 kvListExp = new ArrayList<>(); 712 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE)); 713 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE)); 714 result = scanner.next(); 715 verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); 716 } 717 718 @Test 719 public void testAsyncScannerWithSmallData() throws Exception { 720 testAsyncScanner(TableName.valueOf(name.getMethodName()), 721 2, 722 3, 723 10, 724 -1, 725 null); 726 } 727 728 @Test 729 public void testAsyncScannerWithManyRows() throws Exception { 730 testAsyncScanner(TableName.valueOf(name.getMethodName()), 731 30000, 732 1, 733 1, 734 -1, 735 null); 736 } 737 738 @Test 739 public void testAsyncScannerWithoutCaching() throws Exception { 740 testAsyncScanner(TableName.valueOf(name.getMethodName()), 741 5, 742 1, 743 1, 744 1, 745 (b) -> { 746 try { 747 TimeUnit.MILLISECONDS.sleep(500); 748 } catch (InterruptedException ex) { 749 } 750 }); 751 } 752 753 private void testAsyncScanner(TableName table, int rowNumber, int familyNumber, 754 int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception { 755 assert rowNumber > 0; 756 assert familyNumber > 0; 757 assert qualifierNumber > 0; 758 byte[] row = Bytes.toBytes("r"); 759 byte[] family = Bytes.toBytes("f"); 760 byte[] qualifier = Bytes.toBytes("q"); 761 byte[][] rows = makeNAsciiWithZeroPrefix(row, rowNumber); 762 byte[][] families = makeNAsciiWithZeroPrefix(family, familyNumber); 763 byte[][] qualifiers = makeNAsciiWithZeroPrefix(qualifier, qualifierNumber); 764 765 Table ht = TEST_UTIL.createTable(table, families); 766 767 boolean toLog = true; 768 List<Cell> kvListExp = new ArrayList<>(); 769 770 List<Put> puts = new ArrayList<>(); 771 for (byte[] r : rows) { 772 Put put = new Put(r); 773 for (byte[] f : families) { 774 for (byte[] q : qualifiers) { 775 KeyValue kv = new KeyValue(r, f, q, 1, VALUE); 776 put.add(kv); 777 kvListExp.add(kv); 778 } 779 } 780 puts.add(put); 781 if (puts.size() > 1000) { 782 ht.put(puts); 783 puts.clear(); 784 } 785 } 786 if (!puts.isEmpty()) { 787 ht.put(puts); 788 puts.clear(); 789 } 790 791 Scan scan = new Scan(); 792 scan.setAsyncPrefetch(true); 793 if (caching > 0) { 794 scan.setCaching(caching); 795 } 796 try (ResultScanner scanner = ht.getScanner(scan)) { 797 assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner); 798 ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener); 799 List<Cell> kvListScan = new ArrayList<>(); 800 Result result; 801 boolean first = true; 802 int actualRows = 0; 803 while ((result = scanner.next()) != null) { 804 ++actualRows; 805 // waiting for cache. see HBASE-17376 806 if (first) { 807 TimeUnit.SECONDS.sleep(1); 808 first = false; 809 } 810 for (Cell kv : result.listCells()) { 811 kvListScan.add(kv); 812 } 813 } 814 assertEquals(rowNumber, actualRows); 815 // These cells may have different rows but it is ok. The Result#getRow 816 // isn't used in the verifyResult() 817 result = Result.create(kvListScan); 818 verifyResult(result, kvListExp, toLog, "Testing async scan"); 819 } 820 821 TEST_UTIL.deleteTable(table); 822 } 823 824 private static byte[][] makeNAsciiWithZeroPrefix(byte[] base, int n) { 825 int maxLength = Integer.toString(n).length(); 826 byte [][] ret = new byte[n][]; 827 for (int i = 0; i < n; i++) { 828 int length = Integer.toString(i).length(); 829 StringBuilder buf = new StringBuilder(Integer.toString(i)); 830 IntStream.range(0, maxLength - length).forEach(v -> buf.insert(0, "0")); 831 byte[] tail = Bytes.toBytes(buf.toString()); 832 ret[i] = Bytes.add(base, tail); 833 } 834 return ret; 835 } 836 837 static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, 838 String msg) { 839 840 LOG.info(msg); 841 LOG.info("Expected count: " + expKvList.size()); 842 LOG.info("Actual count: " + result.size()); 843 if (expKvList.isEmpty()) 844 return; 845 846 int i = 0; 847 for (Cell kv : result.rawCells()) { 848 if (i >= expKvList.size()) { 849 break; // we will check the size later 850 } 851 852 Cell kvExp = expKvList.get(i++); 853 if (toLog) { 854 LOG.info("get kv is: " + kv.toString()); 855 LOG.info("exp kv is: " + kvExp.toString()); 856 } 857 assertTrue("Not equal", kvExp.equals(kv)); 858 } 859 860 assertEquals(expKvList.size(), result.size()); 861 } 862 863 @Test 864 public void testReadExpiredDataForRawScan() throws IOException { 865 TableName tableName = TableName.valueOf(name.getMethodName()); 866 long ts = System.currentTimeMillis() - 10000; 867 byte[] value = Bytes.toBytes("expired"); 868 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 869 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value)); 870 assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER)); 871 TEST_UTIL.getAdmin().modifyColumnFamily(tableName, 872 new HColumnDescriptor(FAMILY).setTimeToLive(5)); 873 try (ResultScanner scanner = table.getScanner(FAMILY)) { 874 assertNull(scanner.next()); 875 } 876 try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) { 877 assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER)); 878 assertNull(scanner.next()); 879 } 880 } 881 } 882 883 @Test 884 public void testScanWithColumnsAndFilterAndVersion() throws IOException { 885 TableName tableName = TableName.valueOf(name.getMethodName()); 886 try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) { 887 for (int i = 0; i < 4; i++) { 888 Put put = new Put(ROW); 889 put.addColumn(FAMILY, QUALIFIER, VALUE); 890 table.put(put); 891 } 892 893 Scan scan = new Scan(); 894 scan.addColumn(FAMILY, QUALIFIER); 895 scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER))); 896 scan.readVersions(3); 897 898 try (ResultScanner scanner = table.getScanner(scan)) { 899 Result result = scanner.next(); 900 assertEquals(3, result.size()); 901 } 902 } 903 } 904 905 @Test 906 public void testScanWithSameStartRowStopRow() throws IOException { 907 TableName tableName = TableName.valueOf(name.getMethodName()); 908 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 909 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 910 911 Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW); 912 try (ResultScanner scanner = table.getScanner(scan)) { 913 assertNull(scanner.next()); 914 } 915 916 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true); 917 try (ResultScanner scanner = table.getScanner(scan)) { 918 Result result = scanner.next(); 919 assertNotNull(result); 920 assertArrayEquals(ROW, result.getRow()); 921 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 922 assertNull(scanner.next()); 923 } 924 925 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false); 926 try (ResultScanner scanner = table.getScanner(scan)) { 927 assertNull(scanner.next()); 928 } 929 930 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false); 931 try (ResultScanner scanner = table.getScanner(scan)) { 932 assertNull(scanner.next()); 933 } 934 935 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true); 936 try (ResultScanner scanner = table.getScanner(scan)) { 937 assertNull(scanner.next()); 938 } 939 } 940 } 941 942 @Test 943 public void testReverseScanWithFlush() throws Exception { 944 TableName tableName = TableName.valueOf(name.getMethodName()); 945 final int BATCH_SIZE = 10; 946 final int ROWS_TO_INSERT = 100; 947 final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); 948 949 try (Table table = TEST_UTIL.createTable(tableName, FAMILY); 950 Admin admin = TEST_UTIL.getAdmin()) { 951 List<Put> putList = new ArrayList<>(); 952 for (long i = 0; i < ROWS_TO_INSERT; i++) { 953 Put put = new Put(Bytes.toBytes(i)); 954 put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE); 955 putList.add(put); 956 957 if (putList.size() >= BATCH_SIZE) { 958 table.put(putList); 959 admin.flush(tableName); 960 putList.clear(); 961 } 962 } 963 964 if (!putList.isEmpty()) { 965 table.put(putList); 966 admin.flush(tableName); 967 putList.clear(); 968 } 969 970 Scan scan = new Scan(); 971 scan.setReversed(true); 972 int count = 0; 973 974 try (ResultScanner results = table.getScanner(scan)) { 975 for (Result result : results) { 976 count++; 977 } 978 } 979 assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count, 980 ROWS_TO_INSERT, count); 981 } 982 } 983 984 @Test 985 public void testScannerWithPartialResults() throws Exception { 986 TableName tableName = TableName.valueOf("testScannerWithPartialResults"); 987 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, 988 Bytes.toBytes("c"), 4)) { 989 List<Put> puts = new ArrayList<>(); 990 byte[] largeArray = new byte[10000]; 991 Put put = new Put(Bytes.toBytes("aaaa0")); 992 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 993 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2")); 994 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3")); 995 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4")); 996 puts.add(put); 997 put = new Put(Bytes.toBytes("aaaa1")); 998 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 999 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray); 1000 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray); 1001 puts.add(put); 1002 table.put(puts); 1003 Scan scan = new Scan(); 1004 scan.addFamily(Bytes.toBytes("c")); 1005 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName()); 1006 scan.setMaxResultSize(10001); 1007 scan.setStopRow(Bytes.toBytes("bbbb")); 1008 scan.setFilter(new LimitKVsReturnFilter()); 1009 ResultScanner rs = table.getScanner(scan); 1010 Result result; 1011 int expectedKvNumber = 6; 1012 int returnedKvNumber = 0; 1013 while((result = rs.next()) != null) { 1014 returnedKvNumber += result.listCells().size(); 1015 } 1016 rs.close(); 1017 assertEquals(expectedKvNumber, returnedKvNumber); 1018 } 1019 } 1020 1021 public static class LimitKVsReturnFilter extends FilterBase { 1022 1023 private int cellCount = 0; 1024 1025 @Override 1026 public ReturnCode filterCell(Cell v) throws IOException { 1027 if (cellCount >= 6) { 1028 cellCount++; 1029 return ReturnCode.SKIP; 1030 } 1031 cellCount++; 1032 return ReturnCode.INCLUDE; 1033 } 1034 1035 @Override 1036 public boolean filterAllRemaining() throws IOException { 1037 if (cellCount < 7) { 1038 return false; 1039 } 1040 cellCount++; 1041 return true; 1042 } 1043 1044 @Override 1045 public String toString() { 1046 return this.getClass().getSimpleName(); 1047 } 1048 1049 public static LimitKVsReturnFilter parseFrom(final byte [] pbBytes) 1050 throws DeserializationException { 1051 return new LimitKVsReturnFilter(); 1052 } 1053 } 1054}