001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.List; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestCase; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HColumnDescriptor; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.HRegionInfo; 039import org.apache.hadoop.hbase.HTableDescriptor; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.UnknownScannerException; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.Result; 046import org.apache.hadoop.hbase.client.ResultScanner; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.filter.Filter; 050import org.apache.hadoop.hbase.filter.InclusiveStopFilter; 051import org.apache.hadoop.hbase.filter.PrefixFilter; 052import org.apache.hadoop.hbase.filter.WhileMatchFilter; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.testclassification.RegionServerTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.TestName; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * Test of a long-lived scanner validating as we go. 066 */ 067@Category({RegionServerTests.class, MediumTests.class}) 068public class TestScanner { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestScanner.class); 073 074 @Rule public TestName name = new TestName(); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestScanner.class); 077 private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 078 079 private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW; 080 private static final byte [][] COLS = { HConstants.CATALOG_FAMILY }; 081 private static final byte [][] EXPLICIT_COLS = { 082 HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER, 083 // TODO ryan 084 //HConstants.STARTCODE_QUALIFIER 085 }; 086 087 static final HTableDescriptor TESTTABLEDESC = 088 new HTableDescriptor(TableName.valueOf("testscanner")); 089 static { 090 TESTTABLEDESC.addFamily( 091 new HColumnDescriptor(HConstants.CATALOG_FAMILY) 092 // Ten is an arbitrary number. Keep versions to help debugging. 093 .setMaxVersions(10) 094 .setBlockCacheEnabled(false) 095 .setBlocksize(8 * 1024) 096 ); 097 } 098 /** HRegionInfo for root region */ 099 public static final HRegionInfo REGION_INFO = 100 new HRegionInfo(TESTTABLEDESC.getTableName(), HConstants.EMPTY_BYTE_ARRAY, 101 HConstants.EMPTY_BYTE_ARRAY); 102 103 private static final byte [] ROW_KEY = REGION_INFO.getRegionName(); 104 105 private static final long START_CODE = Long.MAX_VALUE; 106 107 private HRegion region; 108 109 private byte[] firstRowBytes, secondRowBytes, thirdRowBytes; 110 final private byte[] col1; 111 112 public TestScanner() { 113 super(); 114 115 firstRowBytes = START_KEY_BYTES; 116 secondRowBytes = START_KEY_BYTES.clone(); 117 // Increment the least significant character so we get to next row. 118 secondRowBytes[START_KEY_BYTES.length - 1]++; 119 thirdRowBytes = START_KEY_BYTES.clone(); 120 thirdRowBytes[START_KEY_BYTES.length - 1] = 121 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2); 122 col1 = Bytes.toBytes("column1"); 123 } 124 125 /** 126 * Test basic stop row filter works. 127 */ 128 @Test 129 public void testStopRow() throws Exception { 130 byte [] startrow = Bytes.toBytes("bbb"); 131 byte [] stoprow = Bytes.toBytes("ccc"); 132 try { 133 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 134 HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY); 135 List<Cell> results = new ArrayList<>(); 136 // Do simple test of getting one row only first. 137 Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd")); 138 scan.addFamily(HConstants.CATALOG_FAMILY); 139 140 InternalScanner s = region.getScanner(scan); 141 int count = 0; 142 while (s.next(results)) { 143 count++; 144 } 145 s.close(); 146 assertEquals(0, count); 147 // Now do something a bit more imvolved. 148 scan = new Scan(startrow, stoprow); 149 scan.addFamily(HConstants.CATALOG_FAMILY); 150 151 s = region.getScanner(scan); 152 count = 0; 153 Cell kv = null; 154 results = new ArrayList<>(); 155 for (boolean first = true; s.next(results);) { 156 kv = results.get(0); 157 if (first) { 158 assertTrue(CellUtil.matchingRows(kv, startrow)); 159 first = false; 160 } 161 count++; 162 } 163 assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0); 164 // We got something back. 165 assertTrue(count > 10); 166 s.close(); 167 } finally { 168 HBaseTestingUtility.closeRegionAndWAL(this.region); 169 } 170 } 171 172 void rowPrefixFilter(Scan scan) throws IOException { 173 List<Cell> results = new ArrayList<>(); 174 scan.addFamily(HConstants.CATALOG_FAMILY); 175 InternalScanner s = region.getScanner(scan); 176 boolean hasMore = true; 177 while (hasMore) { 178 hasMore = s.next(results); 179 for (Cell kv : results) { 180 assertEquals((byte)'a', CellUtil.cloneRow(kv)[0]); 181 assertEquals((byte)'b', CellUtil.cloneRow(kv)[1]); 182 } 183 results.clear(); 184 } 185 s.close(); 186 } 187 188 void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException { 189 List<Cell> results = new ArrayList<>(); 190 scan.addFamily(HConstants.CATALOG_FAMILY); 191 InternalScanner s = region.getScanner(scan); 192 boolean hasMore = true; 193 while (hasMore) { 194 hasMore = s.next(results); 195 for (Cell kv : results) { 196 assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0); 197 } 198 results.clear(); 199 } 200 s.close(); 201 } 202 203 @Test 204 public void testFilters() throws IOException { 205 try { 206 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 207 HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY); 208 byte [] prefix = Bytes.toBytes("ab"); 209 Filter newFilter = new PrefixFilter(prefix); 210 Scan scan = new Scan(); 211 scan.setFilter(newFilter); 212 rowPrefixFilter(scan); 213 214 byte[] stopRow = Bytes.toBytes("bbc"); 215 newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow)); 216 scan = new Scan(); 217 scan.setFilter(newFilter); 218 rowInclusiveStopFilter(scan, stopRow); 219 220 } finally { 221 HBaseTestingUtility.closeRegionAndWAL(this.region); 222 } 223 } 224 225 /** 226 * Test that closing a scanner while a client is using it doesn't throw 227 * NPEs but instead a UnknownScannerException. HBASE-2503 228 */ 229 @Test 230 public void testRaceBetweenClientAndTimeout() throws Exception { 231 try { 232 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 233 HBaseTestCase.addContent(this.region, HConstants.CATALOG_FAMILY); 234 Scan scan = new Scan(); 235 InternalScanner s = region.getScanner(scan); 236 List<Cell> results = new ArrayList<>(); 237 try { 238 s.next(results); 239 s.close(); 240 s.next(results); 241 fail("We don't want anything more, we should be failing"); 242 } catch (UnknownScannerException ex) { 243 // ok! 244 return; 245 } 246 } finally { 247 HBaseTestingUtility.closeRegionAndWAL(this.region); 248 } 249 } 250 251 /** The test! 252 */ 253 @Test 254 public void testScanner() throws IOException { 255 try { 256 region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 257 Table table = new RegionAsTable(region); 258 259 // Write information to the meta table 260 261 Put put = new Put(ROW_KEY, System.currentTimeMillis()); 262 263 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, 264 REGION_INFO.toByteArray()); 265 table.put(put); 266 267 // What we just committed is in the memstore. Verify that we can get 268 // it back both with scanning and get 269 270 scan(false, null); 271 getRegionInfo(table); 272 273 // Close and re-open 274 275 ((HRegion)region).close(); 276 region = HRegion.openHRegion(region, null); 277 table = new RegionAsTable(region); 278 279 // Verify we can get the data back now that it is on disk. 280 281 scan(false, null); 282 getRegionInfo(table); 283 284 // Store some new information 285 286 String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort(); 287 288 put = new Put(ROW_KEY, System.currentTimeMillis()); 289 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, 290 Bytes.toBytes(address)); 291 292// put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE)); 293 294 table.put(put); 295 296 // Validate that we can still get the HRegionInfo, even though it is in 297 // an older row on disk and there is a newer row in the memstore 298 299 scan(true, address.toString()); 300 getRegionInfo(table); 301 302 // flush cache 303 this.region.flush(true); 304 305 // Validate again 306 307 scan(true, address.toString()); 308 getRegionInfo(table); 309 310 // Close and reopen 311 312 ((HRegion)region).close(); 313 region = HRegion.openHRegion(region,null); 314 table = new RegionAsTable(region); 315 316 // Validate again 317 318 scan(true, address.toString()); 319 getRegionInfo(table); 320 321 // Now update the information again 322 323 address = "bar.foo.com:4321"; 324 325 put = new Put(ROW_KEY, System.currentTimeMillis()); 326 327 put.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(address)); 328 table.put(put); 329 330 // Validate again 331 332 scan(true, address.toString()); 333 getRegionInfo(table); 334 335 // flush cache 336 337 region.flush(true); 338 339 // Validate again 340 341 scan(true, address.toString()); 342 getRegionInfo(table); 343 344 // Close and reopen 345 346 ((HRegion)this.region).close(); 347 this.region = HRegion.openHRegion(region, null); 348 table = new RegionAsTable(this.region); 349 350 // Validate again 351 352 scan(true, address.toString()); 353 getRegionInfo(table); 354 355 } finally { 356 // clean up 357 HBaseTestingUtility.closeRegionAndWAL(this.region); 358 } 359 } 360 361 /** Compare the HRegionInfo we read from HBase to what we stored */ 362 private void validateRegionInfo(byte [] regionBytes) throws IOException { 363 HRegionInfo info = HRegionInfo.parseFromOrNull(regionBytes); 364 365 assertEquals(REGION_INFO.getRegionId(), info.getRegionId()); 366 assertEquals(0, info.getStartKey().length); 367 assertEquals(0, info.getEndKey().length); 368 assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName())); 369 //assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc())); 370 } 371 372 /** Use a scanner to get the region info and then validate the results */ 373 private void scan(boolean validateStartcode, String serverName) 374 throws IOException { 375 InternalScanner scanner = null; 376 Scan scan = null; 377 List<Cell> results = new ArrayList<>(); 378 byte [][][] scanColumns = {COLS, EXPLICIT_COLS}; 379 for(int i = 0; i < scanColumns.length; i++) { 380 try { 381 scan = new Scan(FIRST_ROW); 382 for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) { 383 scan.addColumn(COLS[0], EXPLICIT_COLS[ii]); 384 } 385 scanner = region.getScanner(scan); 386 while (scanner.next(results)) { 387 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, 388 HConstants.REGIONINFO_QUALIFIER)); 389 byte [] val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY, 390 HConstants.REGIONINFO_QUALIFIER)); 391 validateRegionInfo(val); 392 if(validateStartcode) { 393// assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, 394// HConstants.STARTCODE_QUALIFIER)); 395// val = getColumn(results, HConstants.CATALOG_FAMILY, 396// HConstants.STARTCODE_QUALIFIER).getValue(); 397 assertNotNull(val); 398 assertFalse(val.length == 0); 399 long startCode = Bytes.toLong(val); 400 assertEquals(START_CODE, startCode); 401 } 402 403 if(serverName != null) { 404 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY, 405 HConstants.SERVER_QUALIFIER)); 406 val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY, 407 HConstants.SERVER_QUALIFIER)); 408 assertNotNull(val); 409 assertFalse(val.length == 0); 410 String server = Bytes.toString(val); 411 assertEquals(0, server.compareTo(serverName)); 412 } 413 } 414 } finally { 415 InternalScanner s = scanner; 416 scanner = null; 417 if(s != null) { 418 s.close(); 419 } 420 } 421 } 422 } 423 424 private boolean hasColumn(final List<Cell> kvs, final byte [] family, 425 final byte [] qualifier) { 426 for (Cell kv: kvs) { 427 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) { 428 return true; 429 } 430 } 431 return false; 432 } 433 434 private Cell getColumn(final List<Cell> kvs, final byte [] family, 435 final byte [] qualifier) { 436 for (Cell kv: kvs) { 437 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) { 438 return kv; 439 } 440 } 441 return null; 442 } 443 444 445 /** Use get to retrieve the HRegionInfo and validate it */ 446 private void getRegionInfo(Table table) throws IOException { 447 Get get = new Get(ROW_KEY); 448 get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 449 Result result = table.get(get); 450 byte [] bytes = result.value(); 451 validateRegionInfo(bytes); 452 } 453 454 /** 455 * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner 456 * update readers code essentially. This is not highly concurrent, since its all 1 thread. 457 * HBase-910. 458 */ 459 @Test 460 public void testScanAndSyncFlush() throws Exception { 461 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 462 Table hri = new RegionAsTable(region); 463 try { 464 LOG.info("Added: " + 465 HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), 466 Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); 467 int count = count(hri, -1, false); 468 assertEquals(count, count(hri, 100, false)); // do a sync flush. 469 } catch (Exception e) { 470 LOG.error("Failed", e); 471 throw e; 472 } finally { 473 HBaseTestingUtility.closeRegionAndWAL(this.region); 474 } 475 } 476 477 /** 478 * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both 479 * the StoreScanner update readers and the transition from memstore -> snapshot -> store file. 480 */ 481 @Test 482 public void testScanAndRealConcurrentFlush() throws Exception { 483 this.region = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null); 484 Table hri = new RegionAsTable(region); 485 try { 486 LOG.info("Added: " + 487 HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), 488 Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); 489 int count = count(hri, -1, false); 490 assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush 491 } catch (Exception e) { 492 LOG.error("Failed", e); 493 throw e; 494 } finally { 495 HBaseTestingUtility.closeRegionAndWAL(this.region); 496 } 497 } 498 499 /** 500 * Make sure scanner returns correct result when we run a major compaction 501 * with deletes. 502 */ 503 @Test 504 @SuppressWarnings("deprecation") 505 public void testScanAndConcurrentMajorCompact() throws Exception { 506 HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name.getMethodName()); 507 this.region = TEST_UTIL.createLocalHRegion(htd, null, null); 508 Table hri = new RegionAsTable(region); 509 510 try { 511 HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), 512 firstRowBytes, secondRowBytes); 513 HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), 514 firstRowBytes, secondRowBytes); 515 516 Delete dc = new Delete(firstRowBytes); 517 /* delete column1 of firstRow */ 518 dc.addColumns(fam1, col1); 519 region.delete(dc); 520 region.flush(true); 521 522 HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1), 523 secondRowBytes, thirdRowBytes); 524 HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1), 525 secondRowBytes, thirdRowBytes); 526 region.flush(true); 527 528 InternalScanner s = region.getScanner(new Scan()); 529 // run a major compact, column1 of firstRow will be cleaned. 530 region.compact(true); 531 532 List<Cell> results = new ArrayList<>(); 533 s.next(results); 534 535 // make sure returns column2 of firstRow 536 assertTrue("result is not correct, keyValues : " + results, 537 results.size() == 1); 538 assertTrue(CellUtil.matchingRows(results.get(0), firstRowBytes)); 539 assertTrue(CellUtil.matchingFamily(results.get(0), fam2)); 540 541 results = new ArrayList<>(); 542 s.next(results); 543 544 // get secondRow 545 assertTrue(results.size() == 2); 546 assertTrue(CellUtil.matchingRows(results.get(0), secondRowBytes)); 547 assertTrue(CellUtil.matchingFamily(results.get(0), fam1)); 548 assertTrue(CellUtil.matchingFamily(results.get(1), fam2)); 549 } finally { 550 HBaseTestingUtility.closeRegionAndWAL(this.region); 551 } 552 } 553 554 555 /* 556 * @param hri Region 557 * @param flushIndex At what row we start the flush. 558 * @param concurrent if the flush should be concurrent or sync. 559 * @return Count of rows found. 560 * @throws IOException 561 */ 562 private int count(final Table countTable, final int flushIndex, boolean concurrent) 563 throws IOException { 564 LOG.info("Taking out counting scan"); 565 Scan scan = new Scan(); 566 for (byte [] qualifier: EXPLICIT_COLS) { 567 scan.addColumn(HConstants.CATALOG_FAMILY, qualifier); 568 } 569 ResultScanner s = countTable.getScanner(scan); 570 int count = 0; 571 boolean justFlushed = false; 572 while (s.next() != null) { 573 if (justFlushed) { 574 LOG.info("after next() just after next flush"); 575 justFlushed = false; 576 } 577 count++; 578 if (flushIndex == count) { 579 LOG.info("Starting flush at flush index " + flushIndex); 580 Thread t = new Thread() { 581 @Override 582 public void run() { 583 try { 584 region.flush(true); 585 LOG.info("Finishing flush"); 586 } catch (IOException e) { 587 LOG.info("Failed flush cache"); 588 } 589 } 590 }; 591 if (concurrent) { 592 t.start(); // concurrently flush. 593 } else { 594 t.run(); // sync flush 595 } 596 LOG.info("Continuing on after kicking off background flush"); 597 justFlushed = true; 598 } 599 } 600 s.close(); 601 LOG.info("Found " + count + " items"); 602 return count; 603 } 604}