001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.List; 024import java.util.stream.Collectors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellScanner; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.StartMiniClusterOption; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 036import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 037import org.apache.hadoop.hbase.regionserver.HRegion; 038import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 039import org.apache.hadoop.hbase.regionserver.HRegionServer; 040import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 041import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.LargeTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.CommonFSUtils; 046import org.apache.hadoop.hbase.util.FSUtils; 047import org.apache.hadoop.hbase.util.HFileArchiveUtil; 048import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 049import org.junit.After; 050import org.junit.Assert; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059@Category({LargeTests.class, ClientTests.class}) 060public class TestTableSnapshotScanner { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestTableSnapshotScanner.class); 065 066 private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class); 067 private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 068 private static final int NUM_REGION_SERVERS = 2; 069 private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")}; 070 public static byte[] bbb = Bytes.toBytes("bbb"); 071 public static byte[] yyy = Bytes.toBytes("yyy"); 072 073 private FileSystem fs; 074 private Path rootDir; 075 076 @Rule 077 public TestName name = new TestName(); 078 079 public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName, 080 int expectedRegionSize) throws Exception { 081 for (int i = 0; i < 100; i++) { 082 List<RegionInfo> hRegionInfoList = util.getAdmin().getRegions(tableName); 083 if (hRegionInfoList.size() >= expectedRegionSize) { 084 break; 085 } 086 Thread.sleep(1000); 087 } 088 } 089 090 public void setupCluster() throws Exception { 091 setupConf(UTIL.getConfiguration()); 092 StartMiniClusterOption option = StartMiniClusterOption.builder() 093 .numRegionServers(NUM_REGION_SERVERS).numDataNodes(NUM_REGION_SERVERS) 094 .createRootDir(true).build(); 095 UTIL.startMiniCluster(option); 096 rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 097 fs = rootDir.getFileSystem(UTIL.getConfiguration()); 098 } 099 100 public void tearDownCluster() throws Exception { 101 UTIL.shutdownMiniCluster(); 102 } 103 104 private static void setupConf(Configuration conf) { 105 // Enable snapshot 106 conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); 107 } 108 109 @After 110 public void tearDown() throws Exception { 111 } 112 113 public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName, 114 String snapshotName, int numRegions) 115 throws Exception { 116 try { 117 util.deleteTable(tableName); 118 } catch(Exception ex) { 119 // ignore 120 } 121 122 if (numRegions > 1) { 123 util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions); 124 } else { 125 util.createTable(tableName, FAMILIES); 126 } 127 Admin admin = util.getAdmin(); 128 129 // put some stuff in the table 130 Table table = util.getConnection().getTable(tableName); 131 util.loadTable(table, FAMILIES); 132 133 Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration()); 134 FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); 135 136 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, 137 Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); 138 139 // load different values 140 byte[] value = Bytes.toBytes("after_snapshot_value"); 141 util.loadTable(table, FAMILIES, value); 142 143 // cause flush to create new files in the region 144 admin.flush(tableName); 145 table.close(); 146 } 147 148 @Test 149 public void testNoDuplicateResultsWhenSplitting() throws Exception { 150 setupCluster(); 151 TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); 152 String snapshotName = "testSnapshotBug"; 153 try { 154 if (UTIL.getAdmin().tableExists(tableName)) { 155 UTIL.deleteTable(tableName); 156 } 157 158 UTIL.createTable(tableName, FAMILIES); 159 Admin admin = UTIL.getAdmin(); 160 161 // put some stuff in the table 162 Table table = UTIL.getConnection().getTable(tableName); 163 UTIL.loadTable(table, FAMILIES); 164 165 // split to 2 regions 166 admin.split(tableName, Bytes.toBytes("eee")); 167 blockUntilSplitFinished(UTIL, tableName, 2); 168 169 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 170 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 171 172 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, 173 Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); 174 175 // load different values 176 byte[] value = Bytes.toBytes("after_snapshot_value"); 177 UTIL.loadTable(table, FAMILIES, value); 178 179 // cause flush to create new files in the region 180 admin.flush(tableName); 181 table.close(); 182 183 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 184 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 185 186 TableSnapshotScanner scanner = 187 new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 188 189 verifyScanner(scanner, bbb, yyy); 190 scanner.close(); 191 } finally { 192 UTIL.getAdmin().deleteSnapshot(snapshotName); 193 UTIL.deleteTable(tableName); 194 tearDownCluster(); 195 } 196 } 197 198 199 @Test 200 public void testScanLimit() throws Exception { 201 setupCluster(); 202 final TableName tableName = TableName.valueOf(name.getMethodName()); 203 final String snapshotName = tableName + "Snapshot"; 204 TableSnapshotScanner scanner = null; 205 try { 206 createTableAndSnapshot(UTIL, tableName, snapshotName, 50); 207 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 208 Scan scan = new Scan().withStartRow(bbb).setLimit(100); // limit the scan 209 210 scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 211 int count = 0; 212 while (true) { 213 Result result = scanner.next(); 214 if (result == null) { 215 break; 216 } 217 count++; 218 } 219 Assert.assertEquals(100, count); 220 } finally { 221 if (scanner != null) { 222 scanner.close(); 223 } 224 UTIL.getAdmin().deleteSnapshot(snapshotName); 225 UTIL.deleteTable(tableName); 226 tearDownCluster(); 227 } 228 } 229 230 @Test 231 public void testWithSingleRegion() throws Exception { 232 testScanner(UTIL, "testWithSingleRegion", 1, false); 233 } 234 235 @Test 236 public void testWithMultiRegion() throws Exception { 237 testScanner(UTIL, "testWithMultiRegion", 10, false); 238 } 239 240 @Test 241 public void testWithOfflineHBaseMultiRegion() throws Exception { 242 testScanner(UTIL, "testWithMultiRegion", 20, true); 243 } 244 245 @Test 246 public void testScannerWithRestoreScanner() throws Exception { 247 setupCluster(); 248 TableName tableName = TableName.valueOf("testScanner"); 249 String snapshotName = "testScannerWithRestoreScanner"; 250 try { 251 createTableAndSnapshot(UTIL, tableName, snapshotName, 50); 252 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 253 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 254 255 Configuration conf = UTIL.getConfiguration(); 256 Path rootDir = CommonFSUtils.getRootDir(conf); 257 258 TableSnapshotScanner scanner0 = 259 new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); 260 verifyScanner(scanner0, bbb, yyy); 261 scanner0.close(); 262 263 // restore snapshot. 264 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 265 266 // scan the snapshot without restoring snapshot 267 TableSnapshotScanner scanner = 268 new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 269 verifyScanner(scanner, bbb, yyy); 270 scanner.close(); 271 272 // check whether the snapshot has been deleted by the close of scanner. 273 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 274 verifyScanner(scanner, bbb, yyy); 275 scanner.close(); 276 277 // restore snapshot again. 278 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 279 280 // check whether the snapshot has been deleted by the close of scanner. 281 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 282 verifyScanner(scanner, bbb, yyy); 283 scanner.close(); 284 } finally { 285 UTIL.getAdmin().deleteSnapshot(snapshotName); 286 UTIL.deleteTable(tableName); 287 tearDownCluster(); 288 } 289 } 290 291 private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions, 292 boolean shutdownCluster) throws Exception { 293 setupCluster(); 294 TableName tableName = TableName.valueOf("testScanner"); 295 try { 296 createTableAndSnapshot(util, tableName, snapshotName, numRegions); 297 298 if (shutdownCluster) { 299 util.shutdownMiniHBaseCluster(); 300 } 301 302 Path restoreDir = util.getDataTestDirOnTestFS(snapshotName); 303 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 304 305 TableSnapshotScanner scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, 306 snapshotName, scan); 307 308 verifyScanner(scanner, bbb, yyy); 309 scanner.close(); 310 } finally { 311 if (!shutdownCluster) { 312 util.getAdmin().deleteSnapshot(snapshotName); 313 util.deleteTable(tableName); 314 tearDownCluster(); 315 } 316 } 317 } 318 319 private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow) 320 throws IOException, InterruptedException { 321 322 HBaseTestingUtility.SeenRowTracker rowTracker = 323 new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); 324 325 while (true) { 326 Result result = scanner.next(); 327 if (result == null) { 328 break; 329 } 330 verifyRow(result); 331 rowTracker.addRow(result.getRow()); 332 } 333 334 // validate all rows are seen 335 rowTracker.validate(); 336 } 337 338 private static void verifyRow(Result result) throws IOException { 339 byte[] row = result.getRow(); 340 CellScanner scanner = result.cellScanner(); 341 while (scanner.advance()) { 342 Cell cell = scanner.current(); 343 344 //assert that all Cells in the Result have the same key 345 Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, 346 cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 347 } 348 349 for (int j = 0; j < FAMILIES.length; j++) { 350 byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]); 351 Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) 352 + " ,actual:" + Bytes.toString(actual), row, actual); 353 } 354 } 355 356 @Test 357 public void testMergeRegion() throws Exception { 358 setupCluster(); 359 TableName tableName = TableName.valueOf("testMergeRegion"); 360 String snapshotName = tableName.getNameAsString() + "_snapshot"; 361 Configuration conf = UTIL.getConfiguration(); 362 Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 363 long timeout = 20000; // 20s 364 try (Admin admin = UTIL.getAdmin()) { 365 List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName()) 366 .collect(Collectors.toList()); 367 // create table with 3 regions 368 Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3); 369 List<RegionInfo> regions = admin.getRegions(tableName); 370 Assert.assertEquals(3, regions.size()); 371 RegionInfo region0 = regions.get(0); 372 RegionInfo region1 = regions.get(1); 373 RegionInfo region2 = regions.get(2); 374 // put some data in the table 375 UTIL.loadTable(table, FAMILIES); 376 admin.flush(tableName); 377 // wait flush is finished 378 UTIL.waitFor(timeout, () -> { 379 try { 380 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 381 for (RegionInfo region : regions) { 382 Path regionDir = new Path(tableDir, region.getEncodedName()); 383 for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) { 384 if (fs.listStatus(familyDir).length != 1) { 385 return false; 386 } 387 } 388 } 389 return true; 390 } catch (IOException e) { 391 LOG.warn("Failed check if flush is finished", e); 392 return false; 393 } 394 }); 395 // merge 2 regions 396 admin.compactionSwitch(false, serverList); 397 admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(), 398 true); 399 UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2); 400 List<RegionInfo> mergedRegions = admin.getRegions(tableName); 401 RegionInfo mergedRegion = 402 mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName()) 403 ? mergedRegions.get(1) 404 : mergedRegions.get(0); 405 // snapshot 406 admin.snapshot(snapshotName, tableName); 407 Assert.assertEquals(1, admin.listSnapshots().size()); 408 // major compact 409 admin.compactionSwitch(true, serverList); 410 admin.majorCompactRegion(mergedRegion.getRegionName()); 411 // wait until merged region has no reference 412 UTIL.waitFor(timeout, () -> { 413 try { 414 for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster() 415 .getRegionServerThreads()) { 416 HRegionServer regionServer = regionServerThread.getRegionServer(); 417 for (HRegion subRegion : regionServer.getRegions(tableName)) { 418 if (subRegion.getRegionInfo().getEncodedName() 419 .equals(mergedRegion.getEncodedName())) { 420 regionServer.getCompactedHFilesDischarger().chore(); 421 } 422 } 423 } 424 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 425 HRegionFileSystem regionFs = HRegionFileSystem 426 .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true); 427 return !regionFs.hasReferences(admin.getDescriptor(tableName)); 428 } catch (IOException e) { 429 LOG.warn("Failed check merged region has no reference", e); 430 return false; 431 } 432 }); 433 // run catalog janitor to clean and wait for parent regions are archived 434 UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting(); 435 UTIL.waitFor(timeout, () -> { 436 try { 437 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 438 for (FileStatus fileStatus : fs.listStatus(tableDir)) { 439 String name = fileStatus.getPath().getName(); 440 if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) { 441 return false; 442 } 443 } 444 return true; 445 } catch (IOException e) { 446 LOG.warn("Check if parent regions are archived error", e); 447 return false; 448 } 449 }); 450 // set file modify time and then run cleaner 451 long time = System.currentTimeMillis() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000; 452 traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time); 453 UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner(); 454 // scan snapshot 455 try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf, 456 UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, 457 new Scan().withStartRow(bbb).withStopRow(yyy))) { 458 verifyScanner(scanner, bbb, yyy); 459 } 460 } catch (Exception e) { 461 LOG.error("scan snapshot error", e); 462 Assert.fail("Should not throw FileNotFoundException"); 463 Assert.assertTrue(e.getCause() != null); 464 Assert.assertTrue(e.getCause().getCause() instanceof FileNotFoundException); 465 } finally { 466 tearDownCluster(); 467 } 468 } 469 470 private void traverseAndSetFileTime(Path path, long time) throws IOException { 471 fs.setTimes(path, time, -1); 472 if (fs.isDirectory(path)) { 473 List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path)); 474 List<FileStatus> subDirs = 475 allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); 476 List<FileStatus> files = 477 allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); 478 for (FileStatus subDir : subDirs) { 479 traverseAndSetFileTime(subDir.getPath(), time); 480 } 481 for (FileStatus file : files) { 482 fs.setTimes(file.getPath(), time, -1); 483 } 484 } 485 } 486}