001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.List; 024import java.util.stream.Collectors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellScanner; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.StartMiniClusterOption; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 037import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 040import org.apache.hadoop.hbase.regionserver.HRegionServer; 041import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 042import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.CommonFSUtils; 047import org.apache.hadoop.hbase.util.FSUtils; 048import org.apache.hadoop.hbase.util.HFileArchiveUtil; 049import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 050import org.junit.After; 051import org.junit.Assert; 052import org.junit.ClassRule; 053import org.junit.Rule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.rules.TestName; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060@Category({LargeTests.class, ClientTests.class}) 061public class TestTableSnapshotScanner { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestTableSnapshotScanner.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class); 068 private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 069 private static final int NUM_REGION_SERVERS = 2; 070 private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")}; 071 public static byte[] bbb = Bytes.toBytes("bbb"); 072 public static byte[] yyy = Bytes.toBytes("yyy"); 073 074 private FileSystem fs; 075 private Path rootDir; 076 077 @Rule 078 public TestName name = new TestName(); 079 080 public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName, 081 int expectedRegionSize) throws Exception { 082 for (int i = 0; i < 100; i++) { 083 List<HRegionInfo> hRegionInfoList = util.getAdmin().getTableRegions(tableName); 084 if (hRegionInfoList.size() >= expectedRegionSize) { 085 break; 086 } 087 Thread.sleep(1000); 088 } 089 } 090 091 public void setupCluster() throws Exception { 092 setupConf(UTIL.getConfiguration()); 093 StartMiniClusterOption option = StartMiniClusterOption.builder() 094 .numRegionServers(NUM_REGION_SERVERS).numDataNodes(NUM_REGION_SERVERS) 095 .createRootDir(true).build(); 096 UTIL.startMiniCluster(option); 097 rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 098 fs = rootDir.getFileSystem(UTIL.getConfiguration()); 099 } 100 101 public void tearDownCluster() throws Exception { 102 UTIL.shutdownMiniCluster(); 103 } 104 105 private static void setupConf(Configuration conf) { 106 // Enable snapshot 107 conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); 108 } 109 110 @After 111 public void tearDown() throws Exception { 112 } 113 114 public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName, 115 String snapshotName, int numRegions) 116 throws Exception { 117 try { 118 util.deleteTable(tableName); 119 } catch(Exception ex) { 120 // ignore 121 } 122 123 if (numRegions > 1) { 124 util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions); 125 } else { 126 util.createTable(tableName, FAMILIES); 127 } 128 Admin admin = util.getAdmin(); 129 130 // put some stuff in the table 131 Table table = util.getConnection().getTable(tableName); 132 util.loadTable(table, FAMILIES); 133 134 Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration()); 135 FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); 136 137 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, 138 Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); 139 140 // load different values 141 byte[] value = Bytes.toBytes("after_snapshot_value"); 142 util.loadTable(table, FAMILIES, value); 143 144 // cause flush to create new files in the region 145 admin.flush(tableName); 146 table.close(); 147 } 148 149 @Test 150 public void testNoDuplicateResultsWhenSplitting() throws Exception { 151 setupCluster(); 152 TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); 153 String snapshotName = "testSnapshotBug"; 154 try { 155 if (UTIL.getAdmin().tableExists(tableName)) { 156 UTIL.deleteTable(tableName); 157 } 158 159 UTIL.createTable(tableName, FAMILIES); 160 Admin admin = UTIL.getAdmin(); 161 162 // put some stuff in the table 163 Table table = UTIL.getConnection().getTable(tableName); 164 UTIL.loadTable(table, FAMILIES); 165 166 // split to 2 regions 167 admin.split(tableName, Bytes.toBytes("eee")); 168 blockUntilSplitFinished(UTIL, tableName, 2); 169 170 Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()); 171 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 172 173 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, 174 Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); 175 176 // load different values 177 byte[] value = Bytes.toBytes("after_snapshot_value"); 178 UTIL.loadTable(table, FAMILIES, value); 179 180 // cause flush to create new files in the region 181 admin.flush(tableName); 182 table.close(); 183 184 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 185 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 186 187 TableSnapshotScanner scanner = 188 new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 189 190 verifyScanner(scanner, bbb, yyy); 191 scanner.close(); 192 } finally { 193 UTIL.getAdmin().deleteSnapshot(snapshotName); 194 UTIL.deleteTable(tableName); 195 tearDownCluster(); 196 } 197 } 198 199 200 @Test 201 public void testScanLimit() throws Exception { 202 setupCluster(); 203 final TableName tableName = TableName.valueOf(name.getMethodName()); 204 final String snapshotName = tableName + "Snapshot"; 205 TableSnapshotScanner scanner = null; 206 try { 207 createTableAndSnapshot(UTIL, tableName, snapshotName, 50); 208 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 209 Scan scan = new Scan().withStartRow(bbb).setLimit(100); // limit the scan 210 211 scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 212 int count = 0; 213 while (true) { 214 Result result = scanner.next(); 215 if (result == null) { 216 break; 217 } 218 count++; 219 } 220 Assert.assertEquals(100, count); 221 } finally { 222 if (scanner != null) { 223 scanner.close(); 224 } 225 UTIL.getAdmin().deleteSnapshot(snapshotName); 226 UTIL.deleteTable(tableName); 227 tearDownCluster(); 228 } 229 } 230 231 @Test 232 public void testWithSingleRegion() throws Exception { 233 testScanner(UTIL, "testWithSingleRegion", 1, false); 234 } 235 236 @Test 237 public void testWithMultiRegion() throws Exception { 238 testScanner(UTIL, "testWithMultiRegion", 10, false); 239 } 240 241 @Test 242 public void testWithOfflineHBaseMultiRegion() throws Exception { 243 testScanner(UTIL, "testWithMultiRegion", 20, true); 244 } 245 246 @Test 247 public void testScannerWithRestoreScanner() throws Exception { 248 setupCluster(); 249 TableName tableName = TableName.valueOf("testScanner"); 250 String snapshotName = "testScannerWithRestoreScanner"; 251 try { 252 createTableAndSnapshot(UTIL, tableName, snapshotName, 50); 253 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 254 Scan scan = new Scan(bbb, yyy); // limit the scan 255 256 Configuration conf = UTIL.getConfiguration(); 257 Path rootDir = CommonFSUtils.getRootDir(conf); 258 259 TableSnapshotScanner scanner0 = 260 new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); 261 verifyScanner(scanner0, bbb, yyy); 262 scanner0.close(); 263 264 // restore snapshot. 265 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 266 267 // scan the snapshot without restoring snapshot 268 TableSnapshotScanner scanner = 269 new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 270 verifyScanner(scanner, bbb, yyy); 271 scanner.close(); 272 273 // check whether the snapshot has been deleted by the close of scanner. 274 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 275 verifyScanner(scanner, bbb, yyy); 276 scanner.close(); 277 278 // restore snapshot again. 279 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 280 281 // check whether the snapshot has been deleted by the close of scanner. 282 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 283 verifyScanner(scanner, bbb, yyy); 284 scanner.close(); 285 } finally { 286 UTIL.getAdmin().deleteSnapshot(snapshotName); 287 UTIL.deleteTable(tableName); 288 tearDownCluster(); 289 } 290 } 291 292 private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions, 293 boolean shutdownCluster) throws Exception { 294 setupCluster(); 295 TableName tableName = TableName.valueOf("testScanner"); 296 try { 297 createTableAndSnapshot(util, tableName, snapshotName, numRegions); 298 299 if (shutdownCluster) { 300 util.shutdownMiniHBaseCluster(); 301 } 302 303 Path restoreDir = util.getDataTestDirOnTestFS(snapshotName); 304 Scan scan = new Scan(bbb, yyy); // limit the scan 305 306 TableSnapshotScanner scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, 307 snapshotName, scan); 308 309 verifyScanner(scanner, bbb, yyy); 310 scanner.close(); 311 } finally { 312 if (!shutdownCluster) { 313 util.getAdmin().deleteSnapshot(snapshotName); 314 util.deleteTable(tableName); 315 tearDownCluster(); 316 } 317 } 318 } 319 320 private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow) 321 throws IOException, InterruptedException { 322 323 HBaseTestingUtility.SeenRowTracker rowTracker = 324 new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); 325 326 while (true) { 327 Result result = scanner.next(); 328 if (result == null) { 329 break; 330 } 331 verifyRow(result); 332 rowTracker.addRow(result.getRow()); 333 } 334 335 // validate all rows are seen 336 rowTracker.validate(); 337 } 338 339 private static void verifyRow(Result result) throws IOException { 340 byte[] row = result.getRow(); 341 CellScanner scanner = result.cellScanner(); 342 while (scanner.advance()) { 343 Cell cell = scanner.current(); 344 345 //assert that all Cells in the Result have the same key 346 Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, 347 cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 348 } 349 350 for (int j = 0; j < FAMILIES.length; j++) { 351 byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]); 352 Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) 353 + " ,actual:" + Bytes.toString(actual), row, actual); 354 } 355 } 356 357 @Test 358 public void testMergeRegion() throws Exception { 359 setupCluster(); 360 TableName tableName = TableName.valueOf("testMergeRegion"); 361 String snapshotName = tableName.getNameAsString() + "_snapshot"; 362 Configuration conf = UTIL.getConfiguration(); 363 Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 364 long timeout = 20000; // 20s 365 try (Admin admin = UTIL.getAdmin()) { 366 List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName()) 367 .collect(Collectors.toList()); 368 // create table with 3 regions 369 Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3); 370 List<RegionInfo> regions = admin.getRegions(tableName); 371 Assert.assertEquals(3, regions.size()); 372 RegionInfo region0 = regions.get(0); 373 RegionInfo region1 = regions.get(1); 374 RegionInfo region2 = regions.get(2); 375 // put some data in the table 376 UTIL.loadTable(table, FAMILIES); 377 admin.flush(tableName); 378 // wait flush is finished 379 UTIL.waitFor(timeout, () -> { 380 try { 381 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 382 for (RegionInfo region : regions) { 383 Path regionDir = new Path(tableDir, region.getEncodedName()); 384 for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) { 385 if (fs.listStatus(familyDir).length != 1) { 386 return false; 387 } 388 } 389 } 390 return true; 391 } catch (IOException e) { 392 LOG.warn("Failed check if flush is finished", e); 393 return false; 394 } 395 }); 396 // merge 2 regions 397 admin.compactionSwitch(false, serverList); 398 admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(), 399 true); 400 UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2); 401 List<RegionInfo> mergedRegions = admin.getRegions(tableName); 402 RegionInfo mergedRegion = 403 mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName()) 404 ? mergedRegions.get(1) 405 : mergedRegions.get(0); 406 // snapshot 407 admin.snapshot(snapshotName, tableName); 408 Assert.assertEquals(1, admin.listSnapshots().size()); 409 // major compact 410 admin.compactionSwitch(true, serverList); 411 admin.majorCompactRegion(mergedRegion.getRegionName()); 412 // wait until merged region has no reference 413 UTIL.waitFor(timeout, () -> { 414 try { 415 for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster() 416 .getRegionServerThreads()) { 417 HRegionServer regionServer = regionServerThread.getRegionServer(); 418 for (HRegion subRegion : regionServer.getRegions(tableName)) { 419 if (subRegion.getRegionInfo().getEncodedName() 420 .equals(mergedRegion.getEncodedName())) { 421 regionServer.getCompactedHFilesDischarger().chore(); 422 } 423 } 424 } 425 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 426 HRegionFileSystem regionFs = HRegionFileSystem 427 .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true); 428 return !regionFs.hasReferences(admin.getDescriptor(tableName)); 429 } catch (IOException e) { 430 LOG.warn("Failed check merged region has no reference", e); 431 return false; 432 } 433 }); 434 // run catalog janitor to clean and wait for parent regions are archived 435 UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting(); 436 UTIL.waitFor(timeout, () -> { 437 try { 438 Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); 439 for (FileStatus fileStatus : fs.listStatus(tableDir)) { 440 String name = fileStatus.getPath().getName(); 441 if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) { 442 return false; 443 } 444 } 445 return true; 446 } catch (IOException e) { 447 LOG.warn("Check if parent regions are archived error", e); 448 return false; 449 } 450 }); 451 // set file modify time and then run cleaner 452 long time = System.currentTimeMillis() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000; 453 traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time); 454 UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner(); 455 // scan snapshot 456 try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf, 457 UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, new Scan(bbb, yyy))) { 458 verifyScanner(scanner, bbb, yyy); 459 } 460 } catch (Exception e) { 461 LOG.error("scan snapshot error", e); 462 Assert.fail("Should not throw FileNotFoundException"); 463 Assert.assertTrue(e.getCause() != null); 464 Assert.assertTrue(e.getCause().getCause() instanceof FileNotFoundException); 465 } finally { 466 tearDownCluster(); 467 } 468 } 469 470 private void traverseAndSetFileTime(Path path, long time) throws IOException { 471 fs.setTimes(path, time, -1); 472 if (fs.isDirectory(path)) { 473 List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path)); 474 List<FileStatus> subDirs = 475 allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); 476 List<FileStatus> files = 477 allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); 478 for (FileStatus subDir : subDirs) { 479 traverseAndSetFileTime(subDir.getPath(), time); 480 } 481 for (FileStatus file : files) { 482 fs.setTimes(file.getPath(), time, -1); 483 } 484 } 485 } 486}