001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.stream.Collectors; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellScanner; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.StartTestingClusterOption; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 035import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 036import org.apache.hadoop.hbase.regionserver.HRegion; 037import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 038import org.apache.hadoop.hbase.regionserver.HRegionServer; 039import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 040import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.CommonFSUtils; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.hadoop.hbase.util.FSUtils; 047import org.apache.hadoop.hbase.util.HFileArchiveUtil; 048import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 049import org.junit.After; 050import org.junit.Assert; 051import org.junit.Before; 052import org.junit.ClassRule; 053import org.junit.Rule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.rules.TestName; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060@Category({ LargeTests.class, ClientTests.class }) 061public class TestTableSnapshotScanner { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestTableSnapshotScanner.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class); 068 private final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 069 private static final int NUM_REGION_SERVERS = 2; 070 private static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2") }; 071 public static byte[] bbb = Bytes.toBytes("bbb"); 072 public static byte[] yyy = Bytes.toBytes("yyy"); 073 074 private FileSystem fs; 075 private Path rootDir; 076 private boolean clusterUp; 077 078 @Rule 079 public TestName name = new TestName(); 080 081 public static void blockUntilSplitFinished(HBaseTestingUtil util, TableName tableName, 082 int expectedRegionSize) throws Exception { 083 for (int i = 0; i < 100; i++) { 084 List<RegionInfo> hRegionInfoList = util.getAdmin().getRegions(tableName); 085 if (hRegionInfoList.size() >= expectedRegionSize) { 086 break; 087 } 088 Thread.sleep(1000); 089 } 090 } 091 092 @Before 093 public void setupCluster() throws Exception { 094 setupConf(UTIL.getConfiguration()); 095 StartTestingClusterOption option = 096 StartTestingClusterOption.builder().numRegionServers(NUM_REGION_SERVERS) 097 .numDataNodes(NUM_REGION_SERVERS).createRootDir(true).build(); 098 UTIL.startMiniCluster(option); 099 clusterUp = true; 100 rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 101 fs = rootDir.getFileSystem(UTIL.getConfiguration()); 102 } 103 104 @After 105 public void tearDownCluster() throws Exception { 106 if (clusterUp) { 107 UTIL.shutdownMiniCluster(); 108 } 109 } 110 111 protected void setupConf(Configuration conf) { 112 // Enable snapshot 113 conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); 114 } 115 116 public static void createTableAndSnapshot(HBaseTestingUtil util, TableName tableName, 117 String snapshotName, int numRegions) throws Exception { 118 try { 119 util.deleteTable(tableName); 120 } catch (Exception ex) { 121 // ignore 122 } 123 124 if (numRegions > 1) { 125 util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions); 126 } else { 127 util.createTable(tableName, FAMILIES); 128 } 129 Admin admin = util.getAdmin(); 130 131 // put some stuff in the table 132 Table table = util.getConnection().getTable(tableName); 133 util.loadTable(table, FAMILIES); 134 135 Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration()); 136 FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); 137 138 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), null, 139 snapshotName, rootDir, fs, true); 140 141 // load different values 142 byte[] value = Bytes.toBytes("after_snapshot_value"); 143 util.loadTable(table, FAMILIES, value); 144 145 // cause flush to create new files in the region 146 admin.flush(tableName); 147 table.close(); 148 } 149 150 @Test 151 public void testNoDuplicateResultsWhenSplitting() throws Exception { 152 TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); 153 String snapshotName = "testSnapshotBug"; 154 try { 155 if (UTIL.getAdmin().tableExists(tableName)) { 156 UTIL.deleteTable(tableName); 157 } 158 159 UTIL.createTable(tableName, FAMILIES); 160 Admin admin = UTIL.getAdmin(); 161 162 // put some stuff in the table 163 Table table = UTIL.getConnection().getTable(tableName); 164 UTIL.loadTable(table, FAMILIES); 165 166 // split to 2 regions 167 admin.split(tableName, Bytes.toBytes("eee")); 168 blockUntilSplitFinished(UTIL, tableName, 2); 169 170 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 171 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 172 173 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), 174 null, snapshotName, rootDir, fs, true); 175 176 // load different values 177 byte[] value = Bytes.toBytes("after_snapshot_value"); 178 UTIL.loadTable(table, FAMILIES, value); 179 180 // cause flush to create new files in the region 181 admin.flush(tableName); 182 table.close(); 183 184 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 185 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 186 187 TableSnapshotScanner scanner = 188 new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 189 190 verifyScanner(scanner, bbb, yyy); 191 scanner.close(); 192 } catch (Exception e) { 193 e.printStackTrace(); 194 } finally { 195 UTIL.getAdmin().deleteSnapshot(snapshotName); 196 UTIL.deleteTable(tableName); 197 } 198 } 199 200 @Test 201 public void testScanLimit() throws Exception { 202 final TableName tableName = TableName.valueOf(name.getMethodName()); 203 final String snapshotName = tableName + "Snapshot"; 204 TableSnapshotScanner scanner = null; 205 try { 206 createTableAndSnapshot(UTIL, tableName, snapshotName, 50); 207 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 208 Scan scan = new Scan().withStartRow(bbb).setLimit(100); // limit the scan 209 210 scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 211 int count = 0; 212 while (true) { 213 Result result = scanner.next(); 214 if (result == null) { 215 break; 216 } 217 count++; 218 } 219 Assert.assertEquals(100, count); 220 } finally { 221 if (scanner != null) { 222 scanner.close(); 223 } 224 UTIL.getAdmin().deleteSnapshot(snapshotName); 225 UTIL.deleteTable(tableName); 226 } 227 } 228 229 @Test 230 public void testWithSingleRegion() throws Exception { 231 testScanner(UTIL, "testWithSingleRegion", 1, false); 232 } 233 234 @Test 235 public void testWithMultiRegion() throws Exception { 236 testScanner(UTIL, "testWithMultiRegion", 10, false); 237 } 238 239 @Test 240 public void testWithOfflineHBaseMultiRegion() throws Exception { 241 testScanner(UTIL, "testWithMultiRegion", 20, true); 242 } 243 244 @Test 245 public void testScannerWithRestoreScanner() throws Exception { 246 TableName tableName = TableName.valueOf("testScanner"); 247 String snapshotName = "testScannerWithRestoreScanner"; 248 try { 249 createTableAndSnapshot(UTIL, tableName, snapshotName, 50); 250 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 251 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 252 253 Configuration conf = UTIL.getConfiguration(); 254 Path rootDir = CommonFSUtils.getRootDir(conf); 255 256 TableSnapshotScanner scanner0 = 257 new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); 258 verifyScanner(scanner0, bbb, yyy); 259 scanner0.close(); 260 261 // restore snapshot. 262 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 263 264 // scan the snapshot without restoring snapshot 265 TableSnapshotScanner scanner = 266 new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 267 verifyScanner(scanner, bbb, yyy); 268 scanner.close(); 269 270 // check whether the snapshot has been deleted by the close of scanner. 271 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 272 verifyScanner(scanner, bbb, yyy); 273 scanner.close(); 274 275 // restore snapshot again. 276 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 277 278 // check whether the snapshot has been deleted by the close of scanner. 279 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 280 verifyScanner(scanner, bbb, yyy); 281 scanner.close(); 282 } finally { 283 UTIL.getAdmin().deleteSnapshot(snapshotName); 284 UTIL.deleteTable(tableName); 285 } 286 } 287 288 private void testScanner(HBaseTestingUtil util, String snapshotName, int numRegions, 289 boolean shutdownCluster) throws Exception { 290 TableName tableName = TableName.valueOf("testScanner"); 291 try { 292 createTableAndSnapshot(util, tableName, snapshotName, numRegions); 293 294 if (shutdownCluster) { 295 util.shutdownMiniHBaseCluster(); 296 clusterUp = false; 297 } 298 299 Path restoreDir = util.getDataTestDirOnTestFS(snapshotName); 300 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 301 302 TableSnapshotScanner scanner = 303 new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 304 305 verifyScanner(scanner, bbb, yyy); 306 scanner.close(); 307 } finally { 308 if (clusterUp) { 309 util.getAdmin().deleteSnapshot(snapshotName); 310 util.deleteTable(tableName); 311 } 312 } 313 } 314 315 private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow) 316 throws IOException, InterruptedException { 317 318 HBaseTestingUtil.SeenRowTracker rowTracker = 319 new HBaseTestingUtil.SeenRowTracker(startRow, stopRow); 320 321 while (true) { 322 Result result = scanner.next(); 323 if (result == null) { 324 break; 325 } 326 verifyRow(result); 327 rowTracker.addRow(result.getRow()); 328 } 329 330 // validate all rows are seen 331 rowTracker.validate(); 332 } 333 334 private static void verifyRow(Result result) throws IOException { 335 byte[] row = result.getRow(); 336 CellScanner scanner = result.cellScanner(); 337 while (scanner.advance()) { 338 Cell cell = scanner.current(); 339 340 // assert that all Cells in the Result have the same key 341 Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, cell.getRowArray(), 342 cell.getRowOffset(), cell.getRowLength())); 343 } 344 345 for (int j = 0; j < FAMILIES.length; j++) { 346 byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]); 347 Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) 348 + " ,actual:" + Bytes.toString(actual), row, actual); 349 } 350 } 351 352 @Test 353 public void testMergeRegion() throws Exception { 354 TableName tableName = TableName.valueOf("testMergeRegion"); 355 String snapshotName = tableName.getNameAsString() + "_snapshot"; 356 Configuration conf = UTIL.getConfiguration(); 357 Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 358 long timeout = 20000; // 20s 359 try (Admin admin = UTIL.getAdmin()) { 360 List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName()) 361 .collect(Collectors.toList()); 362 // create table with 3 regions 363 Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3); 364 List<RegionInfo> regions = admin.getRegions(tableName); 365 Assert.assertEquals(3, regions.size()); 366 RegionInfo region0 = regions.get(0); 367 RegionInfo region1 = regions.get(1); 368 RegionInfo region2 = regions.get(2); 369 // put some data in the table 370 UTIL.loadTable(table, FAMILIES); 371 admin.flush(tableName); 372 // wait flush is finished 373 UTIL.waitFor(timeout, () -> { 374 try { 375 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 376 for (RegionInfo region : regions) { 377 Path regionDir = new Path(tableDir, region.getEncodedName()); 378 for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) { 379 for (FileStatus fs : fs.listStatus(familyDir)) { 380 if (!fs.getPath().getName().equals(".filelist")) { 381 return true; 382 } 383 } 384 return false; 385 } 386 } 387 return true; 388 } catch (IOException e) { 389 LOG.warn("Failed check if flush is finished", e); 390 return false; 391 } 392 }); 393 // merge 2 regions 394 admin.compactionSwitch(false, serverList); 395 admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(), 396 true); 397 UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2); 398 List<RegionInfo> mergedRegions = admin.getRegions(tableName); 399 RegionInfo mergedRegion = 400 mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName()) 401 ? mergedRegions.get(1) 402 : mergedRegions.get(0); 403 // snapshot 404 admin.snapshot(snapshotName, tableName); 405 Assert.assertEquals(1, admin.listSnapshots().size()); 406 // major compact 407 admin.compactionSwitch(true, serverList); 408 admin.majorCompactRegion(mergedRegion.getRegionName()); 409 // wait until merged region has no reference 410 UTIL.waitFor(timeout, () -> { 411 try { 412 for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster() 413 .getRegionServerThreads()) { 414 HRegionServer regionServer = regionServerThread.getRegionServer(); 415 for (HRegion subRegion : regionServer.getRegions(tableName)) { 416 if ( 417 subRegion.getRegionInfo().getEncodedName().equals(mergedRegion.getEncodedName()) 418 ) { 419 regionServer.getCompactedHFilesDischarger().chore(); 420 } 421 } 422 } 423 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 424 HRegionFileSystem regionFs = HRegionFileSystem 425 .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true); 426 return !regionFs.hasReferences(admin.getDescriptor(tableName)); 427 } catch (IOException e) { 428 LOG.warn("Failed check merged region has no reference", e); 429 return false; 430 } 431 }); 432 // run catalog janitor to clean and wait for parent regions are archived 433 UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting(); 434 UTIL.waitFor(timeout, () -> { 435 try { 436 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 437 for (FileStatus fileStatus : fs.listStatus(tableDir)) { 438 String name = fileStatus.getPath().getName(); 439 if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) { 440 return false; 441 } 442 } 443 return true; 444 } catch (IOException e) { 445 LOG.warn("Check if parent regions are archived error", e); 446 return false; 447 } 448 }); 449 // set file modify time and then run cleaner 450 long time = EnvironmentEdgeManager.currentTime() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000; 451 traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time); 452 UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().triggerCleanerNow().get(); 453 // scan snapshot 454 try (TableSnapshotScanner scanner = 455 new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, 456 new Scan().withStartRow(bbb).withStopRow(yyy))) { 457 verifyScanner(scanner, bbb, yyy); 458 } 459 } catch (Exception e) { 460 LOG.error("scan snapshot error", e); 461 Assert.fail("Should not throw Exception: " + e.getMessage()); 462 } 463 } 464 465 @Test 466 public void testDeleteTableWithMergedRegions() throws Exception { 467 final TableName tableName = TableName.valueOf(this.name.getMethodName()); 468 String snapshotName = tableName.getNameAsString() + "_snapshot"; 469 Configuration conf = UTIL.getConfiguration(); 470 try (Admin admin = UTIL.getConnection().getAdmin()) { 471 // disable compaction 472 admin.compactionSwitch(false, 473 admin.getRegionServers().stream().map(s -> s.getServerName()).collect(Collectors.toList())); 474 // create table 475 Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3); 476 List<RegionInfo> regions = admin.getRegions(tableName); 477 Assert.assertEquals(3, regions.size()); 478 // write some data 479 UTIL.loadTable(table, FAMILIES); 480 // merge region 481 admin.mergeRegionsAsync(new byte[][] { regions.get(0).getEncodedNameAsBytes(), 482 regions.get(1).getEncodedNameAsBytes() }, false).get(); 483 regions = admin.getRegions(tableName); 484 Assert.assertEquals(2, regions.size()); 485 // snapshot 486 admin.snapshot(snapshotName, tableName); 487 // verify snapshot 488 try (TableSnapshotScanner scanner = 489 new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, 490 new Scan().withStartRow(bbb).withStopRow(yyy))) { 491 verifyScanner(scanner, bbb, yyy); 492 } 493 // drop table 494 admin.disableTable(tableName); 495 admin.deleteTable(tableName); 496 // verify snapshot 497 try (TableSnapshotScanner scanner = 498 new TableSnapshotScanner(conf, UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, 499 new Scan().withStartRow(bbb).withStopRow(yyy))) { 500 verifyScanner(scanner, bbb, yyy); 501 } 502 } 503 } 504 505 private void traverseAndSetFileTime(Path path, long time) throws IOException { 506 fs.setTimes(path, time, -1); 507 if (fs.isDirectory(path)) { 508 List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path)); 509 List<FileStatus> subDirs = 510 allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); 511 List<FileStatus> files = 512 allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); 513 for (FileStatus subDir : subDirs) { 514 traverseAndSetFileTime(subDir.getPath(), time); 515 } 516 for (FileStatus file : files) { 517 fs.setTimes(file.getPath(), time, -1); 518 } 519 } 520 } 521}