001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestCase; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HColumnDescriptor; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HRegionInfo; 040import org.apache.hadoop.hbase.HTableDescriptor; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.UnknownScannerException; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Get; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.ResultScanner; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.filter.Filter; 051import org.apache.hadoop.hbase.filter.InclusiveStopFilter; 052import org.apache.hadoop.hbase.filter.PrefixFilter; 053import org.apache.hadoop.hbase.filter.WhileMatchFilter; 054import org.apache.hadoop.hbase.testclassification.RegionServerTests; 055import org.apache.hadoop.hbase.testclassification.SmallTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * Test of a long-lived scanner validating as we go. 067 */ 068@Category({RegionServerTests.class, SmallTests.class}) 069public class TestScanner { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestScanner.class); 074 075 @Rule public TestName name = new TestName(); 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestScanner.class); 078 private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 079 080 private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW; 081 private static final byte [][] COLS = { HConstants.CATALOG_FAMILY }; 082 private static final byte [][] EXPLICIT_COLS = { 083 HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER, 084 // TODO ryan 085 //HConstants.STARTCODE_QUALIFIER 086 }; 087 088 static final HTableDescriptor TESTTABLEDESC = 089 new HTableDescriptor(TableName.valueOf("testscanner")); 090 static { 091 TESTTABLEDESC.addFamily( 092 new HColumnDescriptor(HConstants.CATALOG_FAMILY) 093 // Ten is an arbitrary number. Keep versions to help debugging. 094 .setMaxVersions(10) 095 .setBlockCacheEnabled(false) 096 .setBlocksize(8 * 1024) 097 ); 098 } 099 /** HRegionInfo for root region */ 100 public static final HRegionInfo REGION_INFO = 101 new HRegionInfo(TESTTABLEDESC.getTableName(), HConstants.EMPTY_BYTE_ARRAY, 102 HConstants.EMPTY_BYTE_ARRAY); 103 104 private static final byte [] ROW_KEY = REGION_INFO.getRegionName(); 105 106 private static final long START_CODE = Long.MAX_VALUE; 107 108 private HRegion region; 109 110 private byte[] firstRowBytes, secondRowBytes, thirdRowBytes; 111 final private byte[] col1; 112 113 public TestScanner() { 114 super(); 115 116 firstRowBytes = START_KEY_BYTES; 117 secondRowBytes = START_KEY_BYTES.clone(); 118 // Increment the least significant character so we get to next row. 119 secondRowBytes[START_KEY_BYTES.length - 1]++; 120 thirdRowBytes = START_KEY_BYTES.clone(); 121 thirdRowBytes[START_KEY_BYTES.length - 1] = 122 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 123 col1 = Bytes.toBytes("column1"); 124 } 125 126 /** 127 * Test basic stop row filter works. 128 * @throws Exception 129 */ 130 @Test 131 public void testStopRow() throws Exception { 132 byte [] startrow = Bytes.toBytes("bbb"); 133 byte [] stoprow = Bytes.toBytes("ccc"); 134 try { 135 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 136 HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY); 137 List<Cell> results = new ArrayList<>(); 138 // Do simple test of getting one row only first. 139 Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd")); 140 scan.addFamily(HConstants.CATALOG_FAMILY); 141 142 InternalScanner s = region.getScanner(scan); 143 int count = 0; 144 while (s.next(results)) { 145 count++; 146 } 147 s.close(); 148 assertEquals(0, count); 149 // Now do something a bit more imvolved. 150 scan = new Scan(startrow, stoprow); 151 scan.addFamily(HConstants.CATALOG_FAMILY); 152 153 s = region.getScanner(scan); 154 count = 0; 155 Cell kv = null; 156 results = new ArrayList<>(); 157 for (boolean first = true; s.next(results);) { 158 kv = results.get(0); 159 if (first) { 160 assertTrue(CellUtil.matchingRows(kv, startrow)); 161 first = false; 162 } 163 count++; 164 } 165 assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0); 166 // We got something back. 167 assertTrue(count > 10); 168 s.close(); 169 } finally { 170 HBaseTestingUtility.closeRegionAndWAL(this.region); 171 } 172 } 173 174 void rowPrefixFilter(Scan scan) throws IOException { 175 List<Cell> results = new ArrayList<>(); 176 scan.addFamily(HConstants.CATALOG_FAMILY); 177 InternalScanner s = region.getScanner(scan); 178 boolean hasMore = true; 179 while (hasMore) { 180 hasMore = s.next(results); 181 for (Cell kv : results) { 182 assertEquals((byte)'a', CellUtil.cloneRow(kv)[0]); 183 assertEquals((byte)'b', CellUtil.cloneRow(kv)[1]); 184 } 185 results.clear(); 186 } 187 s.close(); 188 } 189 190 void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException { 191 List<Cell> results = new ArrayList<>(); 192 scan.addFamily(HConstants.CATALOG_FAMILY); 193 InternalScanner s = region.getScanner(scan); 194 boolean hasMore = true; 195 while (hasMore) { 196 hasMore = s.next(results); 197 for (Cell kv : results) { 198 assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0); 199 } 200 results.clear(); 201 } 202 s.close(); 203 } 204 205 @Test 206 public void testFilters() throws IOException { 207 try { 208 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 209 HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY); 210 byte [] prefix = Bytes.toBytes("ab"); 211 Filter newFilter = new PrefixFilter(prefix); 212 Scan scan = new Scan(); 213 scan.setFilter(newFilter); 214 rowPrefixFilter(scan); 215 216 byte[] stopRow = Bytes.toBytes("bbc"); 217 newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow)); 218 scan = new Scan(); 219 scan.setFilter(newFilter); 220 rowInclusiveStopFilter(scan, stopRow); 221 222 } finally { 223 HBaseTestingUtility.closeRegionAndWAL(this.region); 224 } 225 } 226 227 /** 228 * Test that closing a scanner while a client is using it doesn't throw 229 * NPEs but instead a UnknownScannerException. HBASE-2503 230 * @throws Exception 231 */ 232 @Test 233 public void testRaceBetweenClientAndTimeout() throws Exception { 234 try { 235 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 236 HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY); 237 Scan scan = new Scan(); 238 InternalScanner s = region.getScanner(scan); 239 List<Cell> results = new ArrayList<>(); 240 try { 241 s.next(results); 242 s.close(); 243 s.next(results); 244 fail("We don't want anything more, we should be failing"); 245 } catch (UnknownScannerException ex) { 246 // ok! 247 return; 248 } 249 } finally { 250 HBaseTestingUtility.closeRegionAndWAL(this.region); 251 } 252 } 253 254 /** The test! 255 * @throws IOException 256 */ 257 @Test 258 public void testScanner() throws IOException { 259 try { 260 region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 261 Table table = new RegionAsTable(region); 262 263 // Write information to the meta table 264 265 Put put = new Put(ROW_KEY, System.currentTimeMillis()); 266 267 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, 268 REGION_INFO.toByteArray()); 269 table.put(put); 270 271 // What we just committed is in the memstore. Verify that we can get 272 // it back both with scanning and get 273 274 scan(false, null); 275 getRegionInfo(table); 276 277 // Close and re-open 278 279 ((HRegion)region).close(); 280 region = HRegion.openHRegion(region, null); 281 table = new RegionAsTable(region); 282 283 // Verify we can get the data back now that it is on disk. 284 285 scan(false, null); 286 getRegionInfo(table); 287 288 // Store some new information 289 290 String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort(); 291 292 put = new Put(ROW_KEY, System.currentTimeMillis()); 293 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, 294 Bytes.toBytes(address)); 295 296// put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE)); 297 298 table.put(put); 299 300 // Validate that we can still get the HRegionInfo, even though it is in 301 // an older row on disk and there is a newer row in the memstore 302 303 scan(true, address.toString()); 304 getRegionInfo(table); 305 306 // flush cache 307 this.region.flush(true); 308 309 // Validate again 310 311 scan(true, address.toString()); 312 getRegionInfo(table); 313 314 // Close and reopen 315 316 ((HRegion)region).close(); 317 region = HRegion.openHRegion(region,null); 318 table = new RegionAsTable(region); 319 320 // Validate again 321 322 scan(true, address.toString()); 323 getRegionInfo(table); 324 325 // Now update the information again 326 327 address = "bar.foo.com:4321"; 328 329 put = new Put(ROW_KEY, System.currentTimeMillis()); 330 331 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address)); 332 table.put(put); 333 334 // Validate again 335 336 scan(true, address.toString()); 337 getRegionInfo(table); 338 339 // flush cache 340 341 region.flush(true); 342 343 // Validate again 344 345 scan(true, address.toString()); 346 getRegionInfo(table); 347 348 // Close and reopen 349 350 ((HRegion)this.region).close(); 351 this.region = HRegion.openHRegion(region, null); 352 table = new RegionAsTable(this.region); 353 354 // Validate again 355 356 scan(true, address.toString()); 357 getRegionInfo(table); 358 359 } finally { 360 // clean up 361 HBaseTestingUtility.closeRegionAndWAL(this.region); 362 } 363 } 364 365 /** Compare the HRegionInfo we read from HBase to what we stored */ 366 private void validateRegionInfo(byte [] regionBytes) throws IOException { 367 HRegionInfo info = HRegionInfo.parseFromOrNull(regionBytes); 368 369 assertEquals(REGION_INFO.getRegionId(), info.getRegionId()); 370 assertEquals(0, info.getStartKey().length); 371 assertEquals(0, info.getEndKey().length); 372 assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName())); 373 //assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc())); 374 } 375 376 /** Use a scanner to get the region info and then validate the results */ 377 private void scan(boolean validateStartcode, String serverName) 378 throws IOException { 379 InternalScanner scanner = null; 380 Scan scan = null; 381 List<Cell> results = new ArrayList<>(); 382 byte [][][] scanColumns = { 383 COLS, 384 EXPLICIT_COLS 385 }; 386 387 for(int i = 0; i < scanColumns.length; i++) { 388 try { 389 scan = new Scan(FIRST_ROW); 390 for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) { 391 scan.addColumn(COLS[0], EXPLICIT_COLS[ii]); 392 } 393 scanner = region.getScanner(scan); 394 while (scanner.next(results)) { 395 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, 396 HConstants.REGIONINFO_QUALIFIER)); 397 byte [] val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY, 398 HConstants.REGIONINFO_QUALIFIER)); 399 validateRegionInfo(val); 400 if(validateStartcode) { 401// assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, 402// HConstants.STARTCODE_QUALIFIER)); 403// val = getColumn(results, HConstants.CATALOG_FAMILY, 404// HConstants.STARTCODE_QUALIFIER).getValue(); 405 assertNotNull(val); 406 assertFalse(val.length == 0); 407 long startCode = Bytes.toLong(val); 408 assertEquals(START_CODE, startCode); 409 } 410 411 if(serverName != null) { 412 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, 413 HConstants.SERVER_QUALIFIER)); 414 val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY, 415 HConstants.SERVER_QUALIFIER)); 416 assertNotNull(val); 417 assertFalse(val.length == 0); 418 String server = Bytes.toString(val); 419 assertEquals(0, server.compareTo(serverName)); 420 } 421 } 422 } finally { 423 InternalScanner s = scanner; 424 scanner = null; 425 if(s != null) { 426 s.close(); 427 } 428 } 429 } 430 } 431 432 private boolean hasColumn(final List<Cell> kvs, final byte [] family, 433 final byte [] qualifier) { 434 for (Cell kv: kvs) { 435 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) { 436 return true; 437 } 438 } 439 return false; 440 } 441 442 private Cell getColumn(final List<Cell> kvs, final byte [] family, 443 final byte [] qualifier) { 444 for (Cell kv: kvs) { 445 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) { 446 return kv; 447 } 448 } 449 return null; 450 } 451 452 453 /** Use get to retrieve the HRegionInfo and validate it */ 454 private void getRegionInfo(Table table) throws IOException { 455 Get get = new Get(ROW_KEY); 456 get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 457 Result result = table.get(get); 458 byte [] bytes = result.value(); 459 validateRegionInfo(bytes); 460 } 461 462 /** 463 * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner 464 * update readers code essentially. This is not highly concurrent, since its all 1 thread. 465 * HBase-910. 466 * @throws Exception 467 */ 468 @Test 469 public void testScanAndSyncFlush() throws Exception { 470 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 471 Table hri = new RegionAsTable(region); 472 try { 473 LOG.info("Added: " + 474 HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), 475 Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); 476 int count = count(hri, -1, false); 477 assertEquals(count, count(hri, 100, false)); // do a sync flush. 478 } catch (Exception e) { 479 LOG.error("Failed", e); 480 throw e; 481 } finally { 482 HBaseTestingUtility.closeRegionAndWAL(this.region); 483 } 484 } 485 486 /** 487 * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both 488 * the StoreScanner update readers and the transition from memstore -> snapshot -> store file. 489 * 490 * @throws Exception 491 */ 492 @Test 493 public void testScanAndRealConcurrentFlush() throws Exception { 494 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 495 Table hri = new RegionAsTable(region); 496 try { 497 LOG.info("Added: " + 498 HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), 499 Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); 500 int count = count(hri, -1, false); 501 assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush 502 } catch (Exception e) { 503 LOG.error("Failed", e); 504 throw e; 505 } finally { 506 HBaseTestingUtility.closeRegionAndWAL(this.region); 507 } 508 } 509 510 /** 511 * Make sure scanner returns correct result when we run a major compaction 512 * with deletes. 513 * 514 * @throws Exception 515 */ 516 @Test 517 @SuppressWarnings("deprecation") 518 public void testScanAndConcurrentMajorCompact() throws Exception { 519 HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name.getMethodName()); 520 this.region = TEST_UTIL.createLocalHRegion(htd, null, null); 521 Table hri = new RegionAsTable(region); 522 523 try { 524 HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), 525 firstRowBytes, secondRowBytes); 526 HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), 527 firstRowBytes, secondRowBytes); 528 529 Delete dc = new Delete(firstRowBytes); 530 /* delete column1 of firstRow */ 531 dc.addColumns(fam1, col1); 532 region.delete(dc); 533 region.flush(true); 534 535 HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), 536 secondRowBytes, thirdRowBytes); 537 HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), 538 secondRowBytes, thirdRowBytes); 539 region.flush(true); 540 541 InternalScanner s = region.getScanner(new Scan()); 542 // run a major compact, column1 of firstRow will be cleaned. 543 region.compact(true); 544 545 List<Cell> results = new ArrayList<>(); 546 s.next(results); 547 548 // make sure returns column2 of firstRow 549 assertTrue("result is not correct, keyValues : " + results, 550 results.size() == 1); 551 assertTrue(CellUtil.matchingRows(results.get(0), firstRowBytes)); 552 assertTrue(CellUtil.matchingFamily(results.get(0), fam2)); 553 554 results = new ArrayList<>(); 555 s.next(results); 556 557 // get secondRow 558 assertTrue(results.size() == 2); 559 assertTrue(CellUtil.matchingRows(results.get(0), secondRowBytes)); 560 assertTrue(CellUtil.matchingFamily(results.get(0), fam1)); 561 assertTrue(CellUtil.matchingFamily(results.get(1), fam2)); 562 } finally { 563 HBaseTestingUtility.closeRegionAndWAL(this.region); 564 } 565 } 566 567 568 /* 569 * @param hri Region 570 * @param flushIndex At what row we start the flush. 571 * @param concurrent if the flush should be concurrent or sync. 572 * @return Count of rows found. 573 * @throws IOException 574 */ 575 private int count(final Table countTable, final int flushIndex, boolean concurrent) 576 throws IOException { 577 LOG.info("Taking out counting scan"); 578 Scan scan = new Scan(); 579 for (byte [] qualifier: EXPLICIT_COLS) { 580 scan.addColumn(HConstants.CATALOG_FAMILY, qualifier); 581 } 582 ResultScanner s = countTable.getScanner(scan); 583 int count = 0; 584 boolean justFlushed = false; 585 while (s.next() != null) { 586 if (justFlushed) { 587 LOG.info("after next() just after next flush"); 588 justFlushed = false; 589 } 590 count++; 591 if (flushIndex == count) { 592 LOG.info("Starting flush at flush index " + flushIndex); 593 Thread t = new Thread() { 594 @Override 595 public void run() { 596 try { 597 region.flush(true); 598 LOG.info("Finishing flush"); 599 } catch (IOException e) { 600 LOG.info("Failed flush cache"); 601 } 602 } 603 }; 604 if (concurrent) { 605 t.start(); // concurrently flush. 606 } else { 607 t.run(); // sync flush 608 } 609 LOG.info("Continuing on after kicking off background flush"); 610 justFlushed = true; 611 } 612 } 613 s.close(); 614 LOG.info("Found " + count + " items"); 615 return count; 616 } 617}