001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.List; 024import java.util.stream.Collectors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellScanner; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.StartMiniClusterOption; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 037import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 040import org.apache.hadoop.hbase.regionserver.HRegionServer; 041import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 042import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.CommonFSUtils; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.FSUtils; 049import org.apache.hadoop.hbase.util.HFileArchiveUtil; 050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 051import org.junit.After; 052import org.junit.Assert; 053import org.junit.Before; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062@Category({ LargeTests.class, ClientTests.class }) 063public class TestTableSnapshotScanner { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestTableSnapshotScanner.class); 068 069 private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class); 070 private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 071 private static final int NUM_REGION_SERVERS = 2; 072 private static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2") }; 073 public static byte[] bbb = Bytes.toBytes("bbb"); 074 public static byte[] yyy = Bytes.toBytes("yyy"); 075 076 private FileSystem fs; 077 private Path rootDir; 078 private boolean clusterUp; 079 080 @Rule 081 public TestName name = new TestName(); 082 083 public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName, 084 int expectedRegionSize) throws Exception { 085 for (int i = 0; i < 100; i++) { 086 List<HRegionInfo> hRegionInfoList = util.getAdmin().getTableRegions(tableName); 087 if (hRegionInfoList.size() >= expectedRegionSize) { 088 break; 089 } 090 Thread.sleep(1000); 091 } 092 } 093 094 @Before 095 public void setupCluster() throws Exception { 096 setupConf(UTIL.getConfiguration()); 097 StartMiniClusterOption option = 098 StartMiniClusterOption.builder().numRegionServers(NUM_REGION_SERVERS) 099 .numDataNodes(NUM_REGION_SERVERS).createRootDir(true).build(); 100 UTIL.startMiniCluster(option); 101 clusterUp = true; 102 rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 103 fs = rootDir.getFileSystem(UTIL.getConfiguration()); 104 } 105 106 @After 107 public void tearDownCluster() throws Exception { 108 if (clusterUp) { 109 UTIL.shutdownMiniCluster(); 110 } 111 } 112 113 protected void setupConf(Configuration conf) { 114 // Enable snapshot 115 conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); 116 } 117 118 public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName, 119 String snapshotName, int numRegions) throws Exception { 120 try { 121 util.deleteTable(tableName); 122 } catch (Exception ex) { 123 // ignore 124 } 125 126 if (numRegions > 1) { 127 util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions); 128 } else { 129 util.createTable(tableName, FAMILIES); 130 } 131 Admin admin = util.getAdmin(); 132 133 // put some stuff in the table 134 Table table = util.getConnection().getTable(tableName); 135 util.loadTable(table, FAMILIES); 136 137 Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration()); 138 FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); 139 140 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), null, 141 snapshotName, rootDir, fs, true); 142 143 // load different values 144 byte[] value = Bytes.toBytes("after_snapshot_value"); 145 util.loadTable(table, FAMILIES, value); 146 147 // cause flush to create new files in the region 148 admin.flush(tableName); 149 table.close(); 150 } 151 152 @Test 153 public void testNoDuplicateResultsWhenSplitting() throws Exception { 154 TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); 155 String snapshotName = "testSnapshotBug"; 156 try { 157 if (UTIL.getAdmin().tableExists(tableName)) { 158 UTIL.deleteTable(tableName); 159 } 160 161 UTIL.createTable(tableName, FAMILIES); 162 Admin admin = UTIL.getAdmin(); 163 164 // put some stuff in the table 165 Table table = UTIL.getConnection().getTable(tableName); 166 UTIL.loadTable(table, FAMILIES); 167 168 // split to 2 regions 169 admin.split(tableName, Bytes.toBytes("eee")); 170 blockUntilSplitFinished(UTIL, tableName, 2); 171 172 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 173 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 174 175 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), 176 null, snapshotName, rootDir, fs, true); 177 178 // load different values 179 byte[] value = Bytes.toBytes("after_snapshot_value"); 180 UTIL.loadTable(table, FAMILIES, value); 181 182 // cause flush to create new files in the region 183 admin.flush(tableName); 184 table.close(); 185 186 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 187 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 188 189 TableSnapshotScanner scanner = 190 new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 191 192 verifyScanner(scanner, bbb, yyy); 193 scanner.close(); 194 } catch (Exception e) { 195 e.printStackTrace(); 196 } finally { 197 UTIL.getAdmin().deleteSnapshot(snapshotName); 198 UTIL.deleteTable(tableName); 199 } 200 } 201 202 @Test 203 public void testScanLimit() throws Exception { 204 final TableName tableName = TableName.valueOf(name.getMethodName()); 205 final String snapshotName = tableName + "Snapshot"; 206 TableSnapshotScanner scanner = null; 207 try { 208 createTableAndSnapshot(UTIL, tableName, snapshotName, 50); 209 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 210 Scan scan = new Scan().withStartRow(bbb).setLimit(100); // limit the scan 211 212 scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 213 int count = 0; 214 while (true) { 215 Result result = scanner.next(); 216 if (result == null) { 217 break; 218 } 219 count++; 220 } 221 Assert.assertEquals(100, count); 222 } finally { 223 if (scanner != null) { 224 scanner.close(); 225 } 226 UTIL.getAdmin().deleteSnapshot(snapshotName); 227 UTIL.deleteTable(tableName); 228 } 229 } 230 231 @Test 232 public void testWithSingleRegion() throws Exception { 233 testScanner(UTIL, "testWithSingleRegion", 1, false); 234 } 235 236 @Test 237 public void testWithMultiRegion() throws Exception { 238 testScanner(UTIL, "testWithMultiRegion", 10, false); 239 } 240 241 @Test 242 public void testWithOfflineHBaseMultiRegion() throws Exception { 243 testScanner(UTIL, "testWithMultiRegion", 20, true); 244 } 245 246 @Test 247 public void testScannerWithRestoreScanner() throws Exception { 248 TableName tableName = TableName.valueOf("testScanner"); 249 String snapshotName = "testScannerWithRestoreScanner"; 250 try { 251 createTableAndSnapshot(UTIL, tableName, snapshotName, 50); 252 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 253 Scan scan = new Scan(bbb, yyy); // limit the scan 254 255 Configuration conf = UTIL.getConfiguration(); 256 Path rootDir = CommonFSUtils.getRootDir(conf); 257 258 TableSnapshotScanner scanner0 = 259 new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); 260 verifyScanner(scanner0, bbb, yyy); 261 scanner0.close(); 262 263 // restore snapshot. 264 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 265 266 // scan the snapshot without restoring snapshot 267 TableSnapshotScanner scanner = 268 new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 269 verifyScanner(scanner, bbb, yyy); 270 scanner.close(); 271 272 // check whether the snapshot has been deleted by the close of scanner. 273 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 274 verifyScanner(scanner, bbb, yyy); 275 scanner.close(); 276 277 // restore snapshot again. 278 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 279 280 // check whether the snapshot has been deleted by the close of scanner. 281 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 282 verifyScanner(scanner, bbb, yyy); 283 scanner.close(); 284 } finally { 285 UTIL.getAdmin().deleteSnapshot(snapshotName); 286 UTIL.deleteTable(tableName); 287 } 288 } 289 290 private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions, 291 boolean shutdownCluster) throws Exception { 292 TableName tableName = TableName.valueOf("testScanner"); 293 try { 294 createTableAndSnapshot(util, tableName, snapshotName, numRegions); 295 296 if (shutdownCluster) { 297 util.shutdownMiniHBaseCluster(); 298 clusterUp = false; 299 } 300 301 Path restoreDir = util.getDataTestDirOnTestFS(snapshotName); 302 Scan scan = new Scan(bbb, yyy); // limit the scan 303 304 TableSnapshotScanner scanner = 305 new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 306 307 verifyScanner(scanner, bbb, yyy); 308 scanner.close(); 309 } finally { 310 if (clusterUp) { 311 util.getAdmin().deleteSnapshot(snapshotName); 312 util.deleteTable(tableName); 313 } 314 } 315 } 316 317 private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow) 318 throws IOException, InterruptedException { 319 320 HBaseTestingUtility.SeenRowTracker rowTracker = 321 new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); 322 323 while (true) { 324 Result result = scanner.next(); 325 if (result == null) { 326 break; 327 } 328 verifyRow(result); 329 rowTracker.addRow(result.getRow()); 330 } 331 332 // validate all rows are seen 333 rowTracker.validate(); 334 } 335 336 private static void verifyRow(Result result) throws IOException { 337 byte[] row = result.getRow(); 338 CellScanner scanner = result.cellScanner(); 339 while (scanner.advance()) { 340 Cell cell = scanner.current(); 341 342 // assert that all Cells in the Result have the same key 343 Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, cell.getRowArray(), 344 cell.getRowOffset(), cell.getRowLength())); 345 } 346 347 for (int j = 0; j < FAMILIES.length; j++) { 348 byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]); 349 Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) 350 + " ,actual:" + Bytes.toString(actual), row, actual); 351 } 352 } 353 354 @Test 355 public void testMergeRegion() throws Exception { 356 TableName tableName = TableName.valueOf("testMergeRegion"); 357 String snapshotName = tableName.getNameAsString() + "_snapshot"; 358 Configuration conf = UTIL.getConfiguration(); 359 Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 360 long timeout = 20000; // 20s 361 try (Admin admin = UTIL.getAdmin()) { 362 List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName()) 363 .collect(Collectors.toList()); 364 // create table with 3 regions 365 Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3); 366 List<RegionInfo> regions = admin.getRegions(tableName); 367 Assert.assertEquals(3, regions.size()); 368 RegionInfo region0 = regions.get(0); 369 RegionInfo region1 = regions.get(1); 370 RegionInfo region2 = regions.get(2); 371 // put some data in the table 372 UTIL.loadTable(table, FAMILIES); 373 admin.flush(tableName); 374 // wait flush is finished 375 UTIL.waitFor(timeout, () -> { 376 try { 377 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 378 for (RegionInfo region : regions) { 379 Path regionDir = new Path(tableDir, region.getEncodedName()); 380 for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) { 381 for (FileStatus fs : fs.listStatus(familyDir)) { 382 if (!fs.getPath().getName().equals(".filelist")) { 383 return true; 384 } 385 } 386 return false; 387 } 388 } 389 return true; 390 } catch (IOException e) { 391 LOG.warn("Failed check if flush is finished", e); 392 return false; 393 } 394 }); 395 // merge 2 regions 396 admin.compactionSwitch(false, serverList); 397 admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(), 398 true); 399 UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2); 400 List<RegionInfo> mergedRegions = admin.getRegions(tableName); 401 RegionInfo mergedRegion = 402 mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName()) 403 ? mergedRegions.get(1) 404 : mergedRegions.get(0); 405 // snapshot 406 admin.snapshot(snapshotName, tableName); 407 Assert.assertEquals(1, admin.listSnapshots().size()); 408 // major compact 409 admin.compactionSwitch(true, serverList); 410 admin.majorCompactRegion(mergedRegion.getRegionName()); 411 // wait until merged region has no reference 412 UTIL.waitFor(timeout, () -> { 413 try { 414 for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster() 415 .getRegionServerThreads()) { 416 HRegionServer regionServer = regionServerThread.getRegionServer(); 417 for (HRegion subRegion : regionServer.getRegions(tableName)) { 418 if ( 419 subRegion.getRegionInfo().getEncodedName().equals(mergedRegion.getEncodedName()) 420 ) { 421 regionServer.getCompactedHFilesDischarger().chore(); 422 } 423 } 424 } 425 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 426 HRegionFileSystem regionFs = HRegionFileSystem 427 .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true); 428 return !regionFs.hasReferences(admin.getDescriptor(tableName)); 429 } catch (IOException e) { 430 LOG.warn("Failed check merged region has no reference", e); 431 return false; 432 } 433 }); 434 // run catalog janitor to clean and wait for parent regions are archived 435 UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting(); 436 UTIL.waitFor(timeout, () -> { 437 try { 438 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 439 for (FileStatus fileStatus : fs.listStatus(tableDir)) { 440 String name = fileStatus.getPath().getName(); 441 if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) { 442 return false; 443 } 444 } 445 return true; 446 } catch (IOException e) { 447 LOG.warn("Check if parent regions are archived error", e); 448 return false; 449 } 450 }); 451 // set file modify time and then run cleaner 452 long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000; 453 traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time); 454 UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner(); 455 // scan snapshot 456 try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf, 457 UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, new Scan(bbb, yyy))) { 458 verifyScanner(scanner, bbb, yyy); 459 } 460 } catch (Exception e) { 461 LOG.error("scan snapshot error", e); 462 Assert.fail("Should not throw Exception: " + e.getMessage()); 463 Assert.fail("Should not throw FileNotFoundException"); 464 Assert.assertTrue(e.getCause() != null); 465 Assert.assertTrue(e.getCause().getCause() instanceof FileNotFoundException); 466 } 467 } 468 469 @Test 470 public void testDeleteTableWithMergedRegions() throws Exception { 471 final TableName tableName = TableName.valueOf(this.name.getMethodName()); 472 String snapshotName = tableName.getNameAsString() + "_snapshot"; 473 Configuration conf = UTIL.getConfiguration(); 474 try (Admin admin = UTIL.getConnection().getAdmin()) { 475 // disable compaction 476 admin.compactionSwitch(false, 477 admin.getRegionServers().stream().map(s -> s.getServerName()).collect(Collectors.toList())); 478 // create table 479 Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3); 480 List<RegionInfo> regions = admin.getRegions(tableName); 481 Assert.assertEquals(3, regions.size()); 482 // write some data 483 UTIL.loadTable(table, FAMILIES); 484 // merge region 485 admin.mergeRegionsAsync(new byte[][] { regions.get(0).getEncodedNameAsBytes(), 486 regions.get(1).getEncodedNameAsBytes() }, false).get(); 487 regions = admin.getRegions(tableName); 488 Assert.assertEquals(2, regions.size()); 489 // snapshot 490 admin.snapshot(snapshotName, tableName); 491 // verify snapshot 492 try (TableSnapshotScanner scanner = 493 new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, 494 new Scan().withStartRow(bbb).withStopRow(yyy))) { 495 verifyScanner(scanner, bbb, yyy); 496 } 497 // drop table 498 admin.disableTable(tableName); 499 admin.deleteTable(tableName); 500 // verify snapshot 501 try (TableSnapshotScanner scanner = 502 new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, 503 new Scan().withStartRow(bbb).withStopRow(yyy))) { 504 verifyScanner(scanner, bbb, yyy); 505 } 506 } 507 } 508 509 private void traverseAndSetFileTime(Path path, long time) throws IOException { 510 fs.setTimes(path, time, -1); 511 if (fs.isDirectory(path)) { 512 List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path)); 513 List<FileStatus> subDirs = 514 allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); 515 List<FileStatus> files = 516 allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); 517 for (FileStatus subDir : subDirs) { 518 traverseAndSetFileTime(subDir.getPath(), time); 519 } 520 for (FileStatus file : files) { 521 fs.setTimes(file.getPath(), time, -1); 522 } 523 } 524 } 525}