001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.START_KEY_BYTES; 021import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1; 022import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertFalse; 025import static org.junit.jupiter.api.Assertions.assertNotNull; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027import static org.junit.jupiter.api.Assertions.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.HTestConst; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.UnknownScannerException; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 040import org.apache.hadoop.hbase.client.Delete; 041import org.apache.hadoop.hbase.client.Get; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.client.RegionInfoBuilder; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.ResultScanner; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.filter.Filter; 052import org.apache.hadoop.hbase.filter.InclusiveStopFilter; 053import org.apache.hadoop.hbase.filter.PrefixFilter; 054import org.apache.hadoop.hbase.filter.WhileMatchFilter; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.testclassification.RegionServerTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.junit.jupiter.api.Tag; 060import org.junit.jupiter.api.Test; 061import org.junit.jupiter.api.TestInfo; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * Test of a long-lived scanner validating as we go. 067 */ 068@Tag(RegionServerTests.TAG) 069@Tag(MediumTests.TAG) 070public class TestScanner { 071 072 private static final Logger LOG = LoggerFactory.getLogger(TestScanner.class); 073 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 074 075 private static final byte[] FIRST_ROW = HConstants.EMPTY_START_ROW; 076 private static final byte[][] COLS = { HConstants.CATALOG_FAMILY }; 077 private static final byte[][] EXPLICIT_COLS = 078 { HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER, 079 // TODO ryan 080 // HConstants.STARTCODE_QUALIFIER 081 }; 082 083 static final TableDescriptor TESTTABLEDESC = 084 TableDescriptorBuilder.newBuilder(TableName.valueOf("testscanner")) 085 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.CATALOG_FAMILY) 086 // Ten is an arbitrary number. Keep versions to help debugging. 087 .setMaxVersions(10).setBlockCacheEnabled(false).setBlocksize(8 * 1024).build()) 088 .build(); 089 090 /** HRegionInfo for root region */ 091 public static final RegionInfo REGION_INFO = 092 RegionInfoBuilder.newBuilder(TESTTABLEDESC.getTableName()).build(); 093 094 private static final byte[] ROW_KEY = REGION_INFO.getRegionName(); 095 096 private static final long START_CODE = Long.MAX_VALUE; 097 098 private HRegion region; 099 100 private byte[] firstRowBytes, secondRowBytes, thirdRowBytes; 101 final private byte[] col1; 102 103 public TestScanner() { 104 super(); 105 106 firstRowBytes = START_KEY_BYTES; 107 secondRowBytes = START_KEY_BYTES.clone(); 108 // Increment the least significant character so we get to next row. 109 secondRowBytes[START_KEY_BYTES.length - 1]++; 110 thirdRowBytes = START_KEY_BYTES.clone(); 111 thirdRowBytes[START_KEY_BYTES.length - 1] = 112 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 113 col1 = Bytes.toBytes("column1"); 114 } 115 116 /** 117 * Test basic stop row filter works. 118 */ 119 @Test 120 public void testStopRow() throws Exception { 121 byte[] startrow = Bytes.toBytes("bbb"); 122 byte[] stoprow = Bytes.toBytes("ccc"); 123 try { 124 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 125 HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY); 126 List<Cell> results = new ArrayList<>(); 127 // Do simple test of getting one row only first. 128 Scan scan = new Scan().withStartRow(Bytes.toBytes("abc")).withStopRow(Bytes.toBytes("abd")); 129 scan.addFamily(HConstants.CATALOG_FAMILY); 130 131 InternalScanner s = region.getScanner(scan); 132 int count = 0; 133 while (s.next(results)) { 134 count++; 135 } 136 s.close(); 137 assertEquals(0, count); 138 // Now do something a bit more imvolved. 139 scan = new Scan().withStartRow(startrow).withStopRow(stoprow); 140 scan.addFamily(HConstants.CATALOG_FAMILY); 141 142 s = region.getScanner(scan); 143 count = 0; 144 Cell kv = null; 145 results = new ArrayList<>(); 146 for (boolean first = true; s.next(results);) { 147 kv = results.get(0); 148 if (first) { 149 assertTrue(CellUtil.matchingRows(kv, startrow)); 150 first = false; 151 } 152 count++; 153 } 154 assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0); 155 // We got something back. 156 assertTrue(count > 10); 157 s.close(); 158 } finally { 159 HBaseTestingUtil.closeRegionAndWAL(this.region); 160 } 161 } 162 163 void rowPrefixFilter(Scan scan) throws IOException { 164 List<Cell> results = new ArrayList<>(); 165 scan.addFamily(HConstants.CATALOG_FAMILY); 166 InternalScanner s = region.getScanner(scan); 167 boolean hasMore = true; 168 while (hasMore) { 169 hasMore = s.next(results); 170 for (Cell kv : results) { 171 assertEquals((byte) 'a', CellUtil.cloneRow(kv)[0]); 172 assertEquals((byte) 'b', CellUtil.cloneRow(kv)[1]); 173 } 174 results.clear(); 175 } 176 s.close(); 177 } 178 179 void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException { 180 List<Cell> results = new ArrayList<>(); 181 scan.addFamily(HConstants.CATALOG_FAMILY); 182 InternalScanner s = region.getScanner(scan); 183 boolean hasMore = true; 184 while (hasMore) { 185 hasMore = s.next(results); 186 for (Cell kv : results) { 187 assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0); 188 } 189 results.clear(); 190 } 191 s.close(); 192 } 193 194 @Test 195 public void testFilters() throws IOException { 196 try { 197 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 198 HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY); 199 byte[] prefix = Bytes.toBytes("ab"); 200 Filter newFilter = new PrefixFilter(prefix); 201 Scan scan = new Scan(); 202 scan.setFilter(newFilter); 203 rowPrefixFilter(scan); 204 205 byte[] stopRow = Bytes.toBytes("bbc"); 206 newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow)); 207 scan = new Scan(); 208 scan.setFilter(newFilter); 209 rowInclusiveStopFilter(scan, stopRow); 210 211 } finally { 212 HBaseTestingUtil.closeRegionAndWAL(this.region); 213 } 214 } 215 216 /** 217 * Test that closing a scanner while a client is using it doesn't throw NPEs but instead a 218 * UnknownScannerException. HBASE-2503 219 */ 220 @Test 221 public void testRaceBetweenClientAndTimeout() throws Exception { 222 try { 223 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 224 HTestConst.addContent(this.region, HConstants.CATALOG_FAMILY); 225 Scan scan = new Scan(); 226 InternalScanner s = region.getScanner(scan); 227 List<Cell> results = new ArrayList<>(); 228 try { 229 s.next(results); 230 s.close(); 231 s.next(results); 232 fail("We don't want anything more, we should be failing"); 233 } catch (UnknownScannerException ex) { 234 // ok! 235 return; 236 } 237 } finally { 238 HBaseTestingUtil.closeRegionAndWAL(this.region); 239 } 240 } 241 242 /** 243 * The test! 244 */ 245 @Test 246 public void testScanner() throws IOException { 247 try { 248 region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 249 Table table = new RegionAsTable(region); 250 251 // Write information to the meta table 252 253 Put put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime()); 254 255 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, 256 RegionInfo.toByteArray(REGION_INFO)); 257 table.put(put); 258 259 // What we just committed is in the memstore. Verify that we can get 260 // it back both with scanning and get 261 262 scan(false, null); 263 getRegionInfo(table); 264 265 // Close and re-open 266 267 ((HRegion) region).close(); 268 region = HRegion.openHRegion(region, null); 269 table = new RegionAsTable(region); 270 271 // Verify we can get the data back now that it is on disk. 272 273 scan(false, null); 274 getRegionInfo(table); 275 276 // Store some new information 277 278 String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtil.randomFreePort(); 279 280 put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime()); 281 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address)); 282 283 // put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE)); 284 285 table.put(put); 286 287 // Validate that we can still get the HRegionInfo, even though it is in 288 // an older row on disk and there is a newer row in the memstore 289 290 scan(true, address.toString()); 291 getRegionInfo(table); 292 293 // flush cache 294 this.region.flush(true); 295 296 // Validate again 297 298 scan(true, address.toString()); 299 getRegionInfo(table); 300 301 // Close and reopen 302 303 ((HRegion) region).close(); 304 region = HRegion.openHRegion(region, null); 305 table = new RegionAsTable(region); 306 307 // Validate again 308 309 scan(true, address.toString()); 310 getRegionInfo(table); 311 312 // Now update the information again 313 314 address = "bar.foo.com:4321"; 315 316 put = new Put(ROW_KEY, EnvironmentEdgeManager.currentTime()); 317 318 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address)); 319 table.put(put); 320 321 // Validate again 322 323 scan(true, address.toString()); 324 getRegionInfo(table); 325 326 // flush cache 327 328 region.flush(true); 329 330 // Validate again 331 332 scan(true, address.toString()); 333 getRegionInfo(table); 334 335 // Close and reopen 336 337 ((HRegion) this.region).close(); 338 this.region = HRegion.openHRegion(region, null); 339 table = new RegionAsTable(this.region); 340 341 // Validate again 342 343 scan(true, address.toString()); 344 getRegionInfo(table); 345 346 } finally { 347 // clean up 348 HBaseTestingUtil.closeRegionAndWAL(this.region); 349 } 350 } 351 352 /** Compare the HRegionInfo we read from HBase to what we stored */ 353 private void validateRegionInfo(byte[] regionBytes) throws IOException { 354 RegionInfo info = RegionInfo.parseFromOrNull(regionBytes); 355 356 assertEquals(REGION_INFO.getRegionId(), info.getRegionId()); 357 assertEquals(0, info.getStartKey().length); 358 assertEquals(0, info.getEndKey().length); 359 assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName())); 360 // assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc())); 361 } 362 363 /** Use a scanner to get the region info and then validate the results */ 364 private void scan(boolean validateStartcode, String serverName) throws IOException { 365 InternalScanner scanner = null; 366 Scan scan = null; 367 List<Cell> results = new ArrayList<>(); 368 byte[][][] scanColumns = { COLS, EXPLICIT_COLS }; 369 for (int i = 0; i < scanColumns.length; i++) { 370 try { 371 scan = new Scan().withStartRow(FIRST_ROW); 372 for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) { 373 scan.addColumn(COLS[0], EXPLICIT_COLS[ii]); 374 } 375 scanner = region.getScanner(scan); 376 while (scanner.next(results)) { 377 assertTrue( 378 hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); 379 byte[] val = CellUtil.cloneValue( 380 getColumn(results, HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); 381 validateRegionInfo(val); 382 if (validateStartcode) { 383 // assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, 384 // HConstants.STARTCODE_QUALIFIER)); 385 // val = getColumn(results, HConstants.CATALOG_FAMILY, 386 // HConstants.STARTCODE_QUALIFIER).getValue(); 387 assertNotNull(val); 388 assertFalse(val.length == 0); 389 long startCode = Bytes.toLong(val); 390 assertEquals(START_CODE, startCode); 391 } 392 393 if (serverName != null) { 394 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER)); 395 val = CellUtil.cloneValue( 396 getColumn(results, HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER)); 397 assertNotNull(val); 398 assertFalse(val.length == 0); 399 String server = Bytes.toString(val); 400 assertEquals(0, server.compareTo(serverName)); 401 } 402 } 403 } finally { 404 InternalScanner s = scanner; 405 scanner = null; 406 if (s != null) { 407 s.close(); 408 } 409 } 410 } 411 } 412 413 private boolean hasColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) { 414 for (Cell kv : kvs) { 415 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) { 416 return true; 417 } 418 } 419 return false; 420 } 421 422 private Cell getColumn(final List<Cell> kvs, final byte[] family, final byte[] qualifier) { 423 for (Cell kv : kvs) { 424 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) { 425 return kv; 426 } 427 } 428 return null; 429 } 430 431 /** Use get to retrieve the HRegionInfo and validate it */ 432 private void getRegionInfo(Table table) throws IOException { 433 Get get = new Get(ROW_KEY); 434 get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 435 Result result = table.get(get); 436 byte[] bytes = result.value(); 437 validateRegionInfo(bytes); 438 } 439 440 /** 441 * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner update 442 * readers code essentially. This is not highly concurrent, since its all 1 thread. HBase-910. 443 */ 444 @Test 445 public void testScanAndSyncFlush() throws Exception { 446 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 447 Table hri = new RegionAsTable(region); 448 try { 449 LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), 450 Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); 451 int count = count(hri, -1, false); 452 assertEquals(count, count(hri, 100, false)); // do a sync flush. 453 } catch (Exception e) { 454 LOG.error("Failed", e); 455 throw e; 456 } finally { 457 HBaseTestingUtil.closeRegionAndWAL(this.region); 458 } 459 } 460 461 /** 462 * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both the 463 * StoreScanner update readers and the transition from memstore -> snapshot -> store file. 464 */ 465 @Test 466 public void testScanAndRealConcurrentFlush() throws Exception { 467 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 468 Table hri = new RegionAsTable(region); 469 try { 470 LOG.info("Added: " + HTestConst.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), 471 Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); 472 int count = count(hri, -1, false); 473 assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush 474 } catch (Exception e) { 475 LOG.error("Failed", e); 476 throw e; 477 } finally { 478 HBaseTestingUtil.closeRegionAndWAL(this.region); 479 } 480 } 481 482 /** 483 * Make sure scanner returns correct result when we run a major compaction with deletes. 484 */ 485 @Test 486 public void testScanAndConcurrentMajorCompact(TestInfo testInfo) throws Exception { 487 TableDescriptor htd = 488 TEST_UTIL.createTableDescriptor(TableName.valueOf(testInfo.getTestMethod().get().getName()), 489 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, 490 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 491 this.region = TEST_UTIL.createLocalHRegion(htd, null, null); 492 Table hri = new RegionAsTable(region); 493 494 try { 495 HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes, 496 secondRowBytes); 497 HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes, 498 secondRowBytes); 499 500 Delete dc = new Delete(firstRowBytes); 501 /* delete column1 of firstRow */ 502 dc.addColumns(fam1, col1); 503 region.delete(dc); 504 region.flush(true); 505 506 HTestConst.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), secondRowBytes, 507 thirdRowBytes); 508 HTestConst.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), secondRowBytes, 509 thirdRowBytes); 510 region.flush(true); 511 512 InternalScanner s = region.getScanner(new Scan()); 513 // run a major compact, column1 of firstRow will be cleaned. 514 region.compact(true); 515 516 List<Cell> results = new ArrayList<>(); 517 s.next(results); 518 519 // make sure returns column2 of firstRow 520 assertEquals(1, results.size(), "result is not correct, keyValues : " + results); 521 assertTrue(CellUtil.matchingRows(results.get(0), firstRowBytes)); 522 assertTrue(CellUtil.matchingFamily(results.get(0), fam2)); 523 524 results = new ArrayList<>(); 525 s.next(results); 526 527 // get secondRow 528 assertEquals(2, results.size()); 529 assertTrue(CellUtil.matchingRows(results.get(0), secondRowBytes)); 530 assertTrue(CellUtil.matchingFamily(results.get(0), fam1)); 531 assertTrue(CellUtil.matchingFamily(results.get(1), fam2)); 532 } finally { 533 HBaseTestingUtil.closeRegionAndWAL(this.region); 534 } 535 } 536 537 /** 538 * Count table. 539 * @param countTable Table 540 * @param flushIndex At what row we start the flush. 541 * @param concurrent if the flush should be concurrent or sync. 542 * @return Count of rows found. 543 */ 544 private int count(final Table countTable, final int flushIndex, boolean concurrent) 545 throws Exception { 546 LOG.info("Taking out counting scan"); 547 Scan scan = new Scan(); 548 for (byte[] qualifier : EXPLICIT_COLS) { 549 scan.addColumn(HConstants.CATALOG_FAMILY, qualifier); 550 } 551 ResultScanner s = countTable.getScanner(scan); 552 int count = 0; 553 boolean justFlushed = false; 554 while (s.next() != null) { 555 if (justFlushed) { 556 LOG.info("after next() just after next flush"); 557 justFlushed = false; 558 } 559 count++; 560 if (flushIndex == count) { 561 LOG.info("Starting flush at flush index " + flushIndex); 562 Thread t = new Thread() { 563 @Override 564 public void run() { 565 try { 566 region.flush(true); 567 LOG.info("Finishing flush"); 568 } catch (IOException e) { 569 LOG.info("Failed flush cache"); 570 } 571 } 572 }; 573 t.start(); 574 if (!concurrent) { 575 // sync flush 576 t.join(); 577 } 578 LOG.info("Continuing on after kicking off background flush"); 579 justFlushed = true; 580 } 581 } 582 s.close(); 583 LOG.info("Found " + count + " items"); 584 return count; 585 } 586}