001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY; 021import static org.apache.hadoop.hbase.client.FromClientSide3TestBase.generateHugeValue; 022import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS; 023import static org.hamcrest.CoreMatchers.instanceOf; 024import static org.hamcrest.MatcherAssert.assertThat; 025import static org.junit.Assert.assertArrayEquals; 026import static org.junit.Assert.assertEquals; 027import static org.junit.Assert.assertFalse; 028import static org.junit.Assert.assertNotNull; 029import static org.junit.Assert.assertNull; 030import static org.junit.Assert.assertTrue; 031import static org.junit.Assert.fail; 032 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.List; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CompareOperator; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.HRegionLocation; 045import org.apache.hadoop.hbase.HTestConst; 046import org.apache.hadoop.hbase.KeyValue; 047import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 048import org.apache.hadoop.hbase.StartTestingClusterOption; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.TableNameTestRule; 051import org.apache.hadoop.hbase.TableNotFoundException; 052import org.apache.hadoop.hbase.client.Scan.ReadType; 053import org.apache.hadoop.hbase.exceptions.DeserializationException; 054import org.apache.hadoop.hbase.filter.BinaryComparator; 055import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 056import org.apache.hadoop.hbase.filter.ColumnRangeFilter; 057import org.apache.hadoop.hbase.filter.FilterBase; 058import org.apache.hadoop.hbase.filter.QualifierFilter; 059import org.apache.hadoop.hbase.regionserver.HRegionServer; 060import org.apache.hadoop.hbase.testclassification.ClientTests; 061import org.apache.hadoop.hbase.testclassification.LargeTests; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.junit.AfterClass; 065import org.junit.ClassRule; 066import org.junit.Rule; 067import org.junit.Test; 068import org.junit.experimental.categories.Category; 069import org.junit.runner.RunWith; 070import org.junit.runners.Parameterized; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 075 076/** 077 * A client-side test, mostly testing scanners with various parameters. Parameterized on different 078 * registry implementations. 079 */ 080@Category({ ClientTests.class, LargeTests.class }) 081@RunWith(Parameterized.class) 082public class TestScannersFromClientSide { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestScannersFromClientSide.class); 087 088 private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class); 089 090 private static HBaseTestingUtil TEST_UTIL; 091 private static byte[] ROW = Bytes.toBytes("testRow"); 092 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 093 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 094 private static byte[] VALUE = Bytes.toBytes("testValue"); 095 096 @Rule 097 public TableNameTestRule name = new TableNameTestRule(); 098 099 @AfterClass 100 public static void tearDownAfterClass() throws Exception { 101 if (TEST_UTIL != null) { 102 TEST_UTIL.shutdownMiniCluster(); 103 } 104 } 105 106 @Parameterized.Parameters 107 public static Collection<Object[]> parameters() { 108 return Arrays.asList(new Object[][] { { MasterRegistry.class, 1 }, { MasterRegistry.class, 2 }, 109 { ZKConnectionRegistry.class, 1 } }); 110 } 111 112 /** 113 * JUnit does not provide an easy way to run a hook after each parameterized run. Without that 114 * there is no easy way to restart the test cluster after each parameterized run. Annotation 115 * BeforeParam does not work either because it runs before parameterization and hence does not 116 * have access to the test parameters (which is weird). This *hack* checks if the current instance 117 * of test cluster configuration has the passed parameterized configs. In such a case, we can just 118 * reuse the cluster for test and do not need to initialize from scratch. While this is a hack, it 119 * saves a ton of time for the full test and de-flakes it. 120 */ 121 private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) { 122 // initialize() is called for every unit test, however we only want to reset the cluster state 123 // at the end of every parameterized run. 124 if (TEST_UTIL == null) { 125 return false; 126 } 127 Configuration conf = TEST_UTIL.getConfiguration(); 128 Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 129 ZKConnectionRegistry.class); 130 int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 131 AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT); 132 return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig; 133 } 134 135 public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception { 136 if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) { 137 return; 138 } 139 if (TEST_UTIL != null) { 140 // We reached the end of a parameterized run, clean up the cluster. 141 TEST_UTIL.shutdownMiniCluster(); 142 } 143 TEST_UTIL = new HBaseTestingUtil(); 144 Configuration conf = TEST_UTIL.getConfiguration(); 145 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); 146 conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl, 147 ConnectionRegistry.class); 148 Preconditions.checkArgument(numHedgedReqs > 0); 149 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); 150 StartTestingClusterOption.Builder builder = StartTestingClusterOption.builder(); 151 // Multiple masters needed only when hedged reads for master registry are enabled. 152 builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3); 153 TEST_UTIL.startMiniCluster(builder.build()); 154 } 155 156 /** 157 * Test from client side for batch of scan 158 */ 159 @Test 160 public void testScanBatch() throws Exception { 161 final TableName tableName = name.getTableName(); 162 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); 163 164 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 165 166 Put put; 167 Scan scan; 168 Delete delete; 169 Result result; 170 ResultScanner scanner; 171 boolean toLog = true; 172 List<Cell> kvListExp; 173 174 // table: row, family, c0:0, c1:1, ... , c7:7 175 put = new Put(ROW); 176 for (int i = 0; i < QUALIFIERS.length; i++) { 177 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 178 put.add(kv); 179 } 180 ht.put(put); 181 182 // table: row, family, c0:0, c1:1, ..., c6:2, c6:6 , c7:7 183 put = new Put(ROW); 184 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[6], 2, VALUE); 185 put.add(kv); 186 ht.put(put); 187 188 // delete upto ts: 3 189 delete = new Delete(ROW); 190 delete.addFamily(FAMILY, 3); 191 ht.delete(delete); 192 193 // without batch 194 scan = new Scan().withStartRow(ROW); 195 scan.readAllVersions(); 196 scanner = ht.getScanner(scan); 197 198 // c4:4, c5:5, c6:6, c7:7 199 kvListExp = new ArrayList<>(); 200 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 201 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 202 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 203 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 204 result = scanner.next(); 205 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 206 207 // with batch 208 scan = new Scan().withStartRow(ROW); 209 scan.readAllVersions(); 210 scan.setBatch(2); 211 scanner = ht.getScanner(scan); 212 213 // First batch: c4:4, c5:5 214 kvListExp = new ArrayList<>(); 215 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[4], 4, VALUE)); 216 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); 217 result = scanner.next(); 218 verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); 219 220 // Second batch: c6:6, c7:7 221 kvListExp = new ArrayList<>(); 222 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[6], 6, VALUE)); 223 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); 224 result = scanner.next(); 225 verifyResult(result, kvListExp, toLog, "Testing second batch of scan"); 226 227 } 228 229 @Test 230 public void testMaxResultSizeIsSetToDefault() throws Exception { 231 final TableName tableName = name.getTableName(); 232 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 233 234 // The max result size we expect the scan to use by default. 235 long expectedMaxResultSize = 236 TEST_UTIL.getConfiguration().getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 237 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 238 239 int numRows = 5; 240 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 241 242 int numQualifiers = 10; 243 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 244 245 // Specify the cell size such that a single row will be larger than the default 246 // value of maxResultSize. This means that Scan RPCs should return at most a single 247 // result back to the client. 248 int cellSize = (int) (expectedMaxResultSize / (numQualifiers - 1)); 249 byte[] cellValue = Bytes.createMaxByteArray(cellSize); 250 251 Put put; 252 List<Put> puts = new ArrayList<>(); 253 for (int row = 0; row < ROWS.length; row++) { 254 put = new Put(ROWS[row]); 255 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 256 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], cellValue); 257 put.add(kv); 258 } 259 puts.add(put); 260 } 261 ht.put(puts); 262 263 // Create a scan with the default configuration. 264 Scan scan = new Scan(); 265 266 try (ResultScanner scanner = ht.getScanner(scan)) { 267 assertThat(scanner, instanceOf(AsyncTableResultScanner.class)); 268 scanner.next(); 269 AsyncTableResultScanner s = (AsyncTableResultScanner) scanner; 270 // The scanner should have, at most, a single result in its cache. If there more results 271 // exists 272 // in the cache it means that more than the expected max result size was fetched. 273 assertTrue("The cache contains: " + s.getCacheSize() + " results", s.getCacheSize() <= 1); 274 } 275 } 276 277 /** 278 * Scan on not existing table should throw the exception with correct message 279 */ 280 @Test 281 public void testScannerForNotExistingTable() { 282 String[] tableNames = { "A", "Z", "A:A", "Z:Z" }; 283 for (String tableName : tableNames) { 284 try { 285 Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName)); 286 testSmallScan(table, true, 1, 5); 287 fail("TableNotFoundException was not thrown"); 288 } catch (TableNotFoundException e) { 289 // We expect that the message for TableNotFoundException would have only the table name only 290 // Otherwise that would mean that localeRegionInMeta doesn't work properly 291 assertEquals(e.getMessage(), tableName); 292 } catch (Exception e) { 293 fail("Unexpected exception " + e.getMessage()); 294 } 295 } 296 } 297 298 @Test 299 public void testSmallScan() throws Exception { 300 final TableName tableName = name.getTableName(); 301 302 int numRows = 10; 303 byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); 304 305 int numQualifiers = 10; 306 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); 307 308 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 309 310 Put put; 311 List<Put> puts = new ArrayList<>(); 312 for (int row = 0; row < ROWS.length; row++) { 313 put = new Put(ROWS[row]); 314 for (int qual = 0; qual < QUALIFIERS.length; qual++) { 315 KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE); 316 put.add(kv); 317 } 318 puts.add(put); 319 } 320 ht.put(puts); 321 322 int expectedRows = numRows; 323 int expectedCols = numRows * numQualifiers; 324 325 // Test normal and reversed 326 testSmallScan(ht, true, expectedRows, expectedCols); 327 testSmallScan(ht, false, expectedRows, expectedCols); 328 } 329 330 /** 331 * Run through a variety of test configurations with a small scan 332 */ 333 private void testSmallScan(Table table, boolean reversed, int rows, int columns) 334 throws Exception { 335 Scan baseScan = new Scan(); 336 baseScan.setReversed(reversed); 337 baseScan.setReadType(ReadType.PREAD); 338 339 Scan scan = new Scan(baseScan); 340 verifyExpectedCounts(table, scan, rows, columns); 341 342 scan = new Scan(baseScan); 343 scan.setMaxResultSize(1); 344 verifyExpectedCounts(table, scan, rows, columns); 345 346 scan = new Scan(baseScan); 347 scan.setMaxResultSize(1); 348 scan.setCaching(Integer.MAX_VALUE); 349 verifyExpectedCounts(table, scan, rows, columns); 350 } 351 352 private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount, 353 int expectedCellCount) throws Exception { 354 ResultScanner scanner = table.getScanner(scan); 355 356 int rowCount = 0; 357 int cellCount = 0; 358 Result r = null; 359 while ((r = scanner.next()) != null) { 360 rowCount++; 361 cellCount += r.rawCells().length; 362 } 363 364 assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, 365 expectedRowCount == rowCount); 366 assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount, 367 expectedCellCount == cellCount); 368 scanner.close(); 369 } 370 371 /** 372 * Test from client side for get with maxResultPerCF set 373 */ 374 @Test 375 public void testGetMaxResults() throws Exception { 376 final TableName tableName = name.getTableName(); 377 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 378 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 379 380 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 381 382 Get get; 383 Put put; 384 Result result; 385 boolean toLog = true; 386 List<Cell> kvListExp; 387 388 kvListExp = new ArrayList<>(); 389 // Insert one CF for row[0] 390 put = new Put(ROW); 391 for (int i = 0; i < 10; i++) { 392 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 393 put.add(kv); 394 kvListExp.add(kv); 395 } 396 ht.put(put); 397 398 get = new Get(ROW); 399 result = ht.get(get); 400 verifyResult(result, kvListExp, toLog, "Testing without setting maxResults"); 401 402 get = new Get(ROW); 403 get.setMaxResultsPerColumnFamily(2); 404 result = ht.get(get); 405 kvListExp = new ArrayList<>(); 406 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[0], 1, VALUE)); 407 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 408 verifyResult(result, kvListExp, toLog, "Testing basic setMaxResults"); 409 410 // Filters: ColumnRangeFilter 411 get = new Get(ROW); 412 get.setMaxResultsPerColumnFamily(5); 413 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true)); 414 result = ht.get(get); 415 kvListExp = new ArrayList<>(); 416 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[2], 1, VALUE)); 417 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 418 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 419 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 420 verifyResult(result, kvListExp, toLog, "Testing single CF with CRF"); 421 422 // Insert two more CF for row[0] 423 // 20 columns for CF2, 10 columns for CF1 424 put = new Put(ROW); 425 for (int i = 0; i < QUALIFIERS.length; i++) { 426 KeyValue kv = new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE); 427 put.add(kv); 428 } 429 ht.put(put); 430 431 put = new Put(ROW); 432 for (int i = 0; i < 10; i++) { 433 KeyValue kv = new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE); 434 put.add(kv); 435 } 436 ht.put(put); 437 438 get = new Get(ROW); 439 get.setMaxResultsPerColumnFamily(12); 440 get.addFamily(FAMILIES[1]); 441 get.addFamily(FAMILIES[2]); 442 result = ht.get(get); 443 kvListExp = new ArrayList<>(); 444 // Exp: CF1:q0, ..., q9, CF2: q0, q1, q10, q11, ..., q19 445 for (int i = 0; i < 10; i++) { 446 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 447 } 448 for (int i = 0; i < 2; i++) { 449 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 450 } 451 for (int i = 10; i < 20; i++) { 452 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 453 } 454 verifyResult(result, kvListExp, toLog, "Testing multiple CFs"); 455 456 // Filters: ColumnRangeFilter and ColumnPrefixFilter 457 get = new Get(ROW); 458 get.setMaxResultsPerColumnFamily(3); 459 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, null, true)); 460 result = ht.get(get); 461 kvListExp = new ArrayList<>(); 462 for (int i = 2; i < 5; i++) { 463 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 464 } 465 for (int i = 2; i < 5; i++) { 466 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); 467 } 468 for (int i = 2; i < 5; i++) { 469 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 470 } 471 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + CRF"); 472 473 get = new Get(ROW); 474 get.setMaxResultsPerColumnFamily(7); 475 get.setFilter(new ColumnPrefixFilter(QUALIFIERS[1])); 476 result = ht.get(get); 477 kvListExp = new ArrayList<>(); 478 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[1], 1, VALUE)); 479 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[1], 1, VALUE)); 480 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[1], 1, VALUE)); 481 for (int i = 10; i < 16; i++) { 482 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); 483 } 484 verifyResult(result, kvListExp, toLog, "Testing multiple CFs + PFF"); 485 486 } 487 488 /** 489 * Test from client side for scan with maxResultPerCF set 490 */ 491 @Test 492 public void testScanMaxResults() throws Exception { 493 final TableName tableName = name.getTableName(); 494 byte[][] ROWS = HTestConst.makeNAscii(ROW, 2); 495 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 496 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); 497 498 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 499 500 Put put; 501 Scan scan; 502 Result result; 503 boolean toLog = true; 504 List<Cell> kvListExp, kvListScan; 505 506 kvListExp = new ArrayList<>(); 507 508 for (int r = 0; r < ROWS.length; r++) { 509 put = new Put(ROWS[r]); 510 for (int c = 0; c < FAMILIES.length; c++) { 511 for (int q = 0; q < QUALIFIERS.length; q++) { 512 KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); 513 put.add(kv); 514 if (q < 4) { 515 kvListExp.add(kv); 516 } 517 } 518 } 519 ht.put(put); 520 } 521 522 scan = new Scan(); 523 scan.setMaxResultsPerColumnFamily(4); 524 ResultScanner scanner = ht.getScanner(scan); 525 kvListScan = new ArrayList<>(); 526 while ((result = scanner.next()) != null) { 527 for (Cell kv : result.listCells()) { 528 kvListScan.add(kv); 529 } 530 } 531 result = Result.create(kvListScan); 532 verifyResult(result, kvListExp, toLog, "Testing scan with maxResults"); 533 534 } 535 536 /** 537 * Test from client side for get with rowOffset 538 */ 539 @Test 540 public void testGetRowOffset() throws Exception { 541 final TableName tableName = name.getTableName(); 542 byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); 543 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); 544 545 Table ht = TEST_UTIL.createTable(tableName, FAMILIES); 546 547 Get get; 548 Put put; 549 Result result; 550 boolean toLog = true; 551 List<Cell> kvListExp; 552 553 // Insert one CF for row 554 kvListExp = new ArrayList<>(); 555 put = new Put(ROW); 556 for (int i = 0; i < 10; i++) { 557 KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); 558 put.add(kv); 559 // skipping first two kvs 560 if (i < 2) { 561 continue; 562 } 563 kvListExp.add(kv); 564 } 565 ht.put(put); 566 567 // setting offset to 2 568 get = new Get(ROW); 569 get.setRowOffsetPerColumnFamily(2); 570 result = ht.get(get); 571 verifyResult(result, kvListExp, toLog, "Testing basic setRowOffset"); 572 573 // setting offset to 20 574 get = new Get(ROW); 575 get.setRowOffsetPerColumnFamily(20); 576 result = ht.get(get); 577 kvListExp = new ArrayList<>(); 578 verifyResult(result, kvListExp, toLog, "Testing offset > #kvs"); 579 580 // offset + maxResultPerCF 581 get = new Get(ROW); 582 get.setRowOffsetPerColumnFamily(4); 583 get.setMaxResultsPerColumnFamily(5); 584 result = ht.get(get); 585 kvListExp = new ArrayList<>(); 586 for (int i = 4; i < 9; i++) { 587 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE)); 588 } 589 verifyResult(result, kvListExp, toLog, "Testing offset + setMaxResultsPerCF"); 590 591 // Filters: ColumnRangeFilter 592 get = new Get(ROW); 593 get.setRowOffsetPerColumnFamily(1); 594 get.setFilter(new ColumnRangeFilter(QUALIFIERS[2], true, QUALIFIERS[5], true)); 595 result = ht.get(get); 596 kvListExp = new ArrayList<>(); 597 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[3], 1, VALUE)); 598 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[4], 1, VALUE)); 599 kvListExp.add(new KeyValue(ROW, FAMILIES[0], QUALIFIERS[5], 1, VALUE)); 600 verifyResult(result, kvListExp, toLog, "Testing offset with CRF"); 601 602 // Insert into two more CFs for row 603 // 10 columns for CF2, 10 columns for CF1 604 for (int j = 2; j > 0; j--) { 605 put = new Put(ROW); 606 for (int i = 0; i < 10; i++) { 607 KeyValue kv = new KeyValue(ROW, FAMILIES[j], QUALIFIERS[i], 1, VALUE); 608 put.add(kv); 609 } 610 ht.put(put); 611 } 612 613 get = new Get(ROW); 614 get.setRowOffsetPerColumnFamily(4); 615 get.setMaxResultsPerColumnFamily(2); 616 get.addFamily(FAMILIES[1]); 617 get.addFamily(FAMILIES[2]); 618 result = ht.get(get); 619 kvListExp = new ArrayList<>(); 620 // Exp: CF1:q4, q5, CF2: q4, q5 621 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[4], 1, VALUE)); 622 kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[5], 1, VALUE)); 623 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[4], 1, VALUE)); 624 kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE)); 625 verifyResult(result, kvListExp, toLog, "Testing offset + multiple CFs + maxResults"); 626 } 627 628 @Test 629 public void testRawScanExpiredCell() throws Exception { 630 final TableName tableName = name.getTableName(); 631 try (final Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 632 final Put put = new Put(ROW); 633 put.addColumn(FAMILY, QUALIFIER, VALUE); 634 put.setTTL(0); 635 table.put(put); 636 final Scan scan = new Scan().setRaw(true); 637 try (final ResultScanner scanner = table.getScanner(scan)) { 638 final Result result = scanner.next(); 639 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 640 assertNull(scanner.next()); 641 } 642 } finally { 643 TEST_UTIL.deleteTable(tableName); 644 } 645 } 646 647 @Test 648 public void testScanRawDeleteFamilyVersion() throws Exception { 649 TableName tableName = name.getTableName(); 650 TEST_UTIL.createTable(tableName, FAMILY); 651 Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); 652 conf.set(RPC_CODEC_CONF_KEY, ""); 653 conf.set(DEFAULT_CODEC_CLASS, ""); 654 try (Connection connection = ConnectionFactory.createConnection(conf); 655 Table table = connection.getTable(tableName)) { 656 Delete delete = new Delete(ROW); 657 delete.addFamilyVersion(FAMILY, 0L); 658 table.delete(delete); 659 Scan scan = new Scan().withStartRow(ROW).setRaw(true); 660 ResultScanner scanner = table.getScanner(scan); 661 int count = 0; 662 while (scanner.next() != null) { 663 count++; 664 } 665 assertEquals(1, count); 666 } finally { 667 TEST_UTIL.deleteTable(tableName); 668 } 669 } 670 671 /** 672 * Test from client side for scan while the region is reopened on the same region server. 673 */ 674 @Test 675 public void testScanOnReopenedRegion() throws Exception { 676 final TableName tableName = name.getTableName(); 677 byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2); 678 679 Table ht = TEST_UTIL.createTable(tableName, FAMILY); 680 681 Put put; 682 Scan scan; 683 Result result; 684 ResultScanner scanner; 685 boolean toLog = false; 686 List<Cell> kvListExp; 687 688 // table: row, family, c0:0, c1:1 689 put = new Put(ROW); 690 for (int i = 0; i < QUALIFIERS.length; i++) { 691 KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE); 692 put.add(kv); 693 } 694 ht.put(put); 695 696 scan = new Scan().withStartRow(ROW); 697 scanner = ht.getScanner(scan); 698 699 HRegionLocation loc; 700 701 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 702 loc = locator.getRegionLocation(ROW); 703 } 704 RegionInfo hri = loc.getRegion(); 705 SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 706 byte[] regionName = hri.getRegionName(); 707 int i = cluster.getServerWith(regionName); 708 HRegionServer rs = cluster.getRegionServer(i); 709 LOG.info("Unassigning " + hri); 710 TEST_UTIL.getAdmin().unassign(hri.getRegionName(), true); 711 long startTime = EnvironmentEdgeManager.currentTime(); 712 long timeOut = 10000; 713 boolean offline = false; 714 while (true) { 715 if (rs.getOnlineRegion(regionName) == null) { 716 offline = true; 717 break; 718 } 719 assertTrue("Timed out in closing the testing region", 720 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 721 } 722 assertTrue(offline); 723 LOG.info("Assigning " + hri); 724 TEST_UTIL.getAdmin().assign(hri.getRegionName()); 725 startTime = EnvironmentEdgeManager.currentTime(); 726 while (true) { 727 rs = cluster.getRegionServer(cluster.getServerWith(regionName)); 728 if (rs != null && rs.getOnlineRegion(regionName) != null) { 729 offline = false; 730 break; 731 } 732 assertTrue("Timed out in open the testing region", 733 EnvironmentEdgeManager.currentTime() < startTime + timeOut); 734 } 735 assertFalse(offline); 736 737 // c0:0, c1:1 738 kvListExp = new ArrayList<>(); 739 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE)); 740 kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE)); 741 result = scanner.next(); 742 verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region"); 743 } 744 745 static void verifyResult(Result result, List<Cell> expKvList, boolean toLog, String msg) { 746 747 LOG.info(msg); 748 LOG.info("Expected count: " + expKvList.size()); 749 LOG.info("Actual count: " + result.size()); 750 if (expKvList.isEmpty()) { 751 return; 752 } 753 754 int i = 0; 755 for (Cell kv : result.rawCells()) { 756 if (i >= expKvList.size()) { 757 break; // we will check the size later 758 } 759 760 Cell kvExp = expKvList.get(i++); 761 if (toLog) { 762 LOG.info("get kv is: " + kv.toString()); 763 LOG.info("exp kv is: " + kvExp.toString()); 764 } 765 assertTrue("Not equal", kvExp.equals(kv)); 766 } 767 768 assertEquals(expKvList.size(), result.size()); 769 } 770 771 @Test 772 public void testReadExpiredDataForRawScan() throws IOException { 773 TableName tableName = name.getTableName(); 774 long ts = EnvironmentEdgeManager.currentTime() - 10000; 775 byte[] value = Bytes.toBytes("expired"); 776 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 777 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, ts, value)); 778 assertArrayEquals(value, table.get(new Get(ROW)).getValue(FAMILY, QUALIFIER)); 779 TEST_UTIL.getAdmin().modifyColumnFamily(tableName, 780 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setTimeToLive(5).build()); 781 try (ResultScanner scanner = table.getScanner(FAMILY)) { 782 assertNull(scanner.next()); 783 } 784 try (ResultScanner scanner = table.getScanner(new Scan().setRaw(true))) { 785 assertArrayEquals(value, scanner.next().getValue(FAMILY, QUALIFIER)); 786 assertNull(scanner.next()); 787 } 788 } 789 } 790 791 @Test 792 public void testScanWithColumnsAndFilterAndVersion() throws IOException { 793 TableName tableName = name.getTableName(); 794 long now = EnvironmentEdgeManager.currentTime(); 795 try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) { 796 for (int i = 0; i < 4; i++) { 797 Put put = new Put(ROW); 798 put.addColumn(FAMILY, QUALIFIER, now + i, VALUE); 799 table.put(put); 800 } 801 802 Scan scan = new Scan(); 803 scan.addColumn(FAMILY, QUALIFIER); 804 scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(QUALIFIER))); 805 scan.readVersions(3); 806 807 try (ResultScanner scanner = table.getScanner(scan)) { 808 Result result = scanner.next(); 809 assertEquals(3, result.size()); 810 } 811 } 812 } 813 814 @Test 815 public void testScanWithSameStartRowStopRow() throws IOException { 816 TableName tableName = name.getTableName(); 817 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 818 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); 819 820 Scan scan = new Scan().withStartRow(ROW).withStopRow(ROW); 821 try (ResultScanner scanner = table.getScanner(scan)) { 822 assertNull(scanner.next()); 823 } 824 825 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, true); 826 try (ResultScanner scanner = table.getScanner(scan)) { 827 Result result = scanner.next(); 828 assertNotNull(result); 829 assertArrayEquals(ROW, result.getRow()); 830 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 831 assertNull(scanner.next()); 832 } 833 834 scan = new Scan().withStartRow(ROW, true).withStopRow(ROW, false); 835 try (ResultScanner scanner = table.getScanner(scan)) { 836 assertNull(scanner.next()); 837 } 838 839 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, false); 840 try (ResultScanner scanner = table.getScanner(scan)) { 841 assertNull(scanner.next()); 842 } 843 844 scan = new Scan().withStartRow(ROW, false).withStopRow(ROW, true); 845 try (ResultScanner scanner = table.getScanner(scan)) { 846 assertNull(scanner.next()); 847 } 848 } 849 } 850 851 @Test 852 public void testReverseScanWithFlush() throws Exception { 853 TableName tableName = name.getTableName(); 854 final int BATCH_SIZE = 10; 855 final int ROWS_TO_INSERT = 100; 856 final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); 857 858 try (Table table = TEST_UTIL.createTable(tableName, FAMILY); 859 Admin admin = TEST_UTIL.getAdmin()) { 860 List<Put> putList = new ArrayList<>(); 861 for (long i = 0; i < ROWS_TO_INSERT; i++) { 862 Put put = new Put(Bytes.toBytes(i)); 863 put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE); 864 putList.add(put); 865 866 if (putList.size() >= BATCH_SIZE) { 867 table.put(putList); 868 admin.flush(tableName); 869 putList.clear(); 870 } 871 } 872 873 if (!putList.isEmpty()) { 874 table.put(putList); 875 admin.flush(tableName); 876 putList.clear(); 877 } 878 879 Scan scan = new Scan(); 880 scan.setReversed(true); 881 int count = 0; 882 883 try (ResultScanner results = table.getScanner(scan)) { 884 for (Result result : results) { 885 count++; 886 } 887 } 888 assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count, 889 ROWS_TO_INSERT, count); 890 } 891 } 892 893 @Test 894 public void testScannerWithPartialResults() throws Exception { 895 TableName tableName = TableName.valueOf("testScannerWithPartialResults"); 896 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, Bytes.toBytes("c"), 4)) { 897 List<Put> puts = new ArrayList<>(); 898 byte[] largeArray = new byte[10000]; 899 Put put = new Put(Bytes.toBytes("aaaa0")); 900 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 901 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), Bytes.toBytes("2")); 902 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), Bytes.toBytes("3")); 903 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("4"), Bytes.toBytes("4")); 904 puts.add(put); 905 put = new Put(Bytes.toBytes("aaaa1")); 906 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("1"), Bytes.toBytes("1")); 907 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("2"), largeArray); 908 put.addColumn(Bytes.toBytes("c"), Bytes.toBytes("3"), largeArray); 909 puts.add(put); 910 table.put(puts); 911 Scan scan = new Scan(); 912 scan.addFamily(Bytes.toBytes("c")); 913 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getName()); 914 scan.setMaxResultSize(10001); 915 scan.withStopRow(Bytes.toBytes("bbbb")); 916 scan.setFilter(new LimitKVsReturnFilter()); 917 ResultScanner rs = table.getScanner(scan); 918 Result result; 919 int expectedKvNumber = 6; 920 int returnedKvNumber = 0; 921 while ((result = rs.next()) != null) { 922 returnedKvNumber += result.listCells().size(); 923 } 924 rs.close(); 925 assertEquals(expectedKvNumber, returnedKvNumber); 926 } 927 } 928 929 public static class LimitKVsReturnFilter extends FilterBase { 930 931 private int cellCount = 0; 932 933 @Override 934 public ReturnCode filterCell(Cell v) throws IOException { 935 if (cellCount >= 6) { 936 cellCount++; 937 return ReturnCode.SKIP; 938 } 939 cellCount++; 940 return ReturnCode.INCLUDE; 941 } 942 943 @Override 944 public boolean filterAllRemaining() throws IOException { 945 if (cellCount < 7) { 946 return false; 947 } 948 cellCount++; 949 return true; 950 } 951 952 @Override 953 public String toString() { 954 return this.getClass().getSimpleName(); 955 } 956 957 public static LimitKVsReturnFilter parseFrom(final byte[] pbBytes) 958 throws DeserializationException { 959 return new LimitKVsReturnFilter(); 960 } 961 } 962}