001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.List; 024import java.util.stream.Collectors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellScanner; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.StartMiniClusterOption; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 037import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 040import org.apache.hadoop.hbase.regionserver.HRegionServer; 041import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 042import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.LargeTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.FSUtils; 047import org.apache.hadoop.hbase.util.HFileArchiveUtil; 048import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 049import org.junit.After; 050import org.junit.Assert; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057@Category({LargeTests.class, ClientTests.class}) 058public class TestTableSnapshotScanner { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestTableSnapshotScanner.class); 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestTableSnapshotScanner.class); 065 private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 066 private static final int NUM_REGION_SERVERS = 2; 067 private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")}; 068 public static byte[] bbb = Bytes.toBytes("bbb"); 069 public static byte[] yyy = Bytes.toBytes("yyy"); 070 071 private FileSystem fs; 072 private Path rootDir; 073 074 public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName, 075 int expectedRegionSize) throws Exception { 076 for (int i = 0; i < 100; i++) { 077 List<HRegionInfo> hRegionInfoList = util.getAdmin().getTableRegions(tableName); 078 if (hRegionInfoList.size() >= expectedRegionSize) { 079 break; 080 } 081 Thread.sleep(1000); 082 } 083 } 084 085 public void setupCluster() throws Exception { 086 setupConf(UTIL.getConfiguration()); 087 StartMiniClusterOption option = StartMiniClusterOption.builder() 088 .numRegionServers(NUM_REGION_SERVERS).numDataNodes(NUM_REGION_SERVERS) 089 .createRootDir(true).build(); 090 UTIL.startMiniCluster(option); 091 rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 092 fs = rootDir.getFileSystem(UTIL.getConfiguration()); 093 } 094 095 public void tearDownCluster() throws Exception { 096 UTIL.shutdownMiniCluster(); 097 } 098 099 private static void setupConf(Configuration conf) { 100 // Enable snapshot 101 conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); 102 } 103 104 @After 105 public void tearDown() throws Exception { 106 } 107 108 public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName, 109 String snapshotName, int numRegions) 110 throws Exception { 111 try { 112 util.deleteTable(tableName); 113 } catch(Exception ex) { 114 // ignore 115 } 116 117 if (numRegions > 1) { 118 util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions); 119 } else { 120 util.createTable(tableName, FAMILIES); 121 } 122 Admin admin = util.getAdmin(); 123 124 // put some stuff in the table 125 Table table = util.getConnection().getTable(tableName); 126 util.loadTable(table, FAMILIES); 127 128 Path rootDir = FSUtils.getRootDir(util.getConfiguration()); 129 FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); 130 131 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, 132 Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); 133 134 // load different values 135 byte[] value = Bytes.toBytes("after_snapshot_value"); 136 util.loadTable(table, FAMILIES, value); 137 138 // cause flush to create new files in the region 139 admin.flush(tableName); 140 table.close(); 141 } 142 143 @Test 144 public void testNoDuplicateResultsWhenSplitting() throws Exception { 145 setupCluster(); 146 TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); 147 String snapshotName = "testSnapshotBug"; 148 try { 149 if (UTIL.getAdmin().tableExists(tableName)) { 150 UTIL.deleteTable(tableName); 151 } 152 153 UTIL.createTable(tableName, FAMILIES); 154 Admin admin = UTIL.getAdmin(); 155 156 // put some stuff in the table 157 Table table = UTIL.getConnection().getTable(tableName); 158 UTIL.loadTable(table, FAMILIES); 159 160 // split to 2 regions 161 admin.split(tableName, Bytes.toBytes("eee")); 162 blockUntilSplitFinished(UTIL, tableName, 2); 163 164 Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration()); 165 FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); 166 167 SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, 168 Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); 169 170 // load different values 171 byte[] value = Bytes.toBytes("after_snapshot_value"); 172 UTIL.loadTable(table, FAMILIES, value); 173 174 // cause flush to create new files in the region 175 admin.flush(tableName); 176 table.close(); 177 178 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 179 Scan scan = new Scan().withStartRow(bbb).withStopRow(yyy); // limit the scan 180 181 TableSnapshotScanner scanner = 182 new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan); 183 184 verifyScanner(scanner, bbb, yyy); 185 scanner.close(); 186 } finally { 187 UTIL.getAdmin().deleteSnapshot(snapshotName); 188 UTIL.deleteTable(tableName); 189 tearDownCluster(); 190 } 191 } 192 193 @Test 194 public void testWithSingleRegion() throws Exception { 195 testScanner(UTIL, "testWithSingleRegion", 1, false); 196 } 197 198 @Test 199 public void testWithMultiRegion() throws Exception { 200 testScanner(UTIL, "testWithMultiRegion", 10, false); 201 } 202 203 @Test 204 public void testWithOfflineHBaseMultiRegion() throws Exception { 205 testScanner(UTIL, "testWithMultiRegion", 20, true); 206 } 207 208 @Test 209 public void testScannerWithRestoreScanner() throws Exception { 210 setupCluster(); 211 TableName tableName = TableName.valueOf("testScanner"); 212 String snapshotName = "testScannerWithRestoreScanner"; 213 try { 214 createTableAndSnapshot(UTIL, tableName, snapshotName, 50); 215 Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName); 216 Scan scan = new Scan(bbb, yyy); // limit the scan 217 218 Configuration conf = UTIL.getConfiguration(); 219 Path rootDir = FSUtils.getRootDir(conf); 220 221 TableSnapshotScanner scanner0 = 222 new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); 223 verifyScanner(scanner0, bbb, yyy); 224 scanner0.close(); 225 226 // restore snapshot. 227 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 228 229 // scan the snapshot without restoring snapshot 230 TableSnapshotScanner scanner = 231 new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 232 verifyScanner(scanner, bbb, yyy); 233 scanner.close(); 234 235 // check whether the snapshot has been deleted by the close of scanner. 236 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 237 verifyScanner(scanner, bbb, yyy); 238 scanner.close(); 239 240 // restore snapshot again. 241 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 242 243 // check whether the snapshot has been deleted by the close of scanner. 244 scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true); 245 verifyScanner(scanner, bbb, yyy); 246 scanner.close(); 247 } finally { 248 UTIL.getAdmin().deleteSnapshot(snapshotName); 249 UTIL.deleteTable(tableName); 250 tearDownCluster(); 251 } 252 } 253 254 private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions, 255 boolean shutdownCluster) throws Exception { 256 setupCluster(); 257 TableName tableName = TableName.valueOf("testScanner"); 258 try { 259 createTableAndSnapshot(util, tableName, snapshotName, numRegions); 260 261 if (shutdownCluster) { 262 util.shutdownMiniHBaseCluster(); 263 } 264 265 Path restoreDir = util.getDataTestDirOnTestFS(snapshotName); 266 Scan scan = new Scan(bbb, yyy); // limit the scan 267 268 TableSnapshotScanner scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, 269 snapshotName, scan); 270 271 verifyScanner(scanner, bbb, yyy); 272 scanner.close(); 273 } finally { 274 if (!shutdownCluster) { 275 util.getAdmin().deleteSnapshot(snapshotName); 276 util.deleteTable(tableName); 277 tearDownCluster(); 278 } 279 } 280 } 281 282 private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow) 283 throws IOException, InterruptedException { 284 285 HBaseTestingUtility.SeenRowTracker rowTracker = 286 new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); 287 288 while (true) { 289 Result result = scanner.next(); 290 if (result == null) { 291 break; 292 } 293 verifyRow(result); 294 rowTracker.addRow(result.getRow()); 295 } 296 297 // validate all rows are seen 298 rowTracker.validate(); 299 } 300 301 private static void verifyRow(Result result) throws IOException { 302 byte[] row = result.getRow(); 303 CellScanner scanner = result.cellScanner(); 304 while (scanner.advance()) { 305 Cell cell = scanner.current(); 306 307 //assert that all Cells in the Result have the same key 308 Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, 309 cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 310 } 311 312 for (int j = 0; j < FAMILIES.length; j++) { 313 byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]); 314 Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) 315 + " ,actual:" + Bytes.toString(actual), row, actual); 316 } 317 } 318 319 @Test 320 public void testMergeRegion() throws Exception { 321 setupCluster(); 322 TableName tableName = TableName.valueOf("testMergeRegion"); 323 String snapshotName = tableName.getNameAsString() + "_snapshot"; 324 Configuration conf = UTIL.getConfiguration(); 325 Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 326 long timeout = 20000; // 20s 327 try (Admin admin = UTIL.getAdmin()) { 328 List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName()) 329 .collect(Collectors.toList()); 330 // create table with 3 regions 331 Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3); 332 List<RegionInfo> regions = admin.getRegions(tableName); 333 Assert.assertEquals(3, regions.size()); 334 RegionInfo region0 = regions.get(0); 335 RegionInfo region1 = regions.get(1); 336 RegionInfo region2 = regions.get(2); 337 // put some data in the table 338 UTIL.loadTable(table, FAMILIES); 339 admin.flush(tableName); 340 // wait flush is finished 341 UTIL.waitFor(timeout, () -> { 342 try { 343 Path tableDir = FSUtils.getTableDir(rootDir, tableName); 344 for (RegionInfo region : regions) { 345 Path regionDir = new Path(tableDir, region.getEncodedName()); 346 for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) { 347 if (fs.listStatus(familyDir).length != 1) { 348 return false; 349 } 350 } 351 } 352 return true; 353 } catch (IOException e) { 354 LOG.warn("Failed check if flush is finished", e); 355 return false; 356 } 357 }); 358 // merge 2 regions 359 admin.compactionSwitch(false, serverList); 360 admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(), 361 true); 362 UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2); 363 List<RegionInfo> mergedRegions = admin.getRegions(tableName); 364 RegionInfo mergedRegion = 365 mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName()) 366 ? mergedRegions.get(1) 367 : mergedRegions.get(0); 368 // snapshot 369 admin.snapshot(snapshotName, tableName); 370 Assert.assertEquals(1, admin.listSnapshots().size()); 371 // major compact 372 admin.compactionSwitch(true, serverList); 373 admin.majorCompactRegion(mergedRegion.getRegionName()); 374 // wait until merged region has no reference 375 UTIL.waitFor(timeout, () -> { 376 try { 377 for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster() 378 .getRegionServerThreads()) { 379 HRegionServer regionServer = regionServerThread.getRegionServer(); 380 for (HRegion subRegion : regionServer.getRegions(tableName)) { 381 if (subRegion.getRegionInfo().getEncodedName() 382 .equals(mergedRegion.getEncodedName())) { 383 regionServer.getCompactedHFilesDischarger().chore(); 384 } 385 } 386 } 387 Path tableDir = FSUtils.getTableDir(rootDir, tableName); 388 HRegionFileSystem regionFs = HRegionFileSystem 389 .openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true); 390 return !regionFs.hasReferences(admin.getDescriptor(tableName)); 391 } catch (IOException e) { 392 LOG.warn("Failed check merged region has no reference", e); 393 return false; 394 } 395 }); 396 // run catalog janitor to clean and wait for parent regions are archived 397 UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting(); 398 UTIL.waitFor(timeout, () -> { 399 try { 400 Path tableDir = FSUtils.getTableDir(rootDir, tableName); 401 for (FileStatus fileStatus : fs.listStatus(tableDir)) { 402 String name = fileStatus.getPath().getName(); 403 if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) { 404 return false; 405 } 406 } 407 return true; 408 } catch (IOException e) { 409 LOG.warn("Check if parent regions are archived error", e); 410 return false; 411 } 412 }); 413 // set file modify time and then run cleaner 414 long time = System.currentTimeMillis() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000; 415 traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time); 416 UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner(); 417 // scan snapshot 418 try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf, 419 UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, new Scan(bbb, yyy))) { 420 verifyScanner(scanner, bbb, yyy); 421 } 422 } catch (Exception e) { 423 LOG.error("scan snapshot error", e); 424 Assert.fail("Should not throw FileNotFoundException"); 425 Assert.assertTrue(e.getCause() != null); 426 Assert.assertTrue(e.getCause().getCause() instanceof FileNotFoundException); 427 } finally { 428 tearDownCluster(); 429 } 430 } 431 432 private void traverseAndSetFileTime(Path path, long time) throws IOException { 433 fs.setTimes(path, time, -1); 434 if (fs.isDirectory(path)) { 435 List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path)); 436 List<FileStatus> subDirs = 437 allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList()); 438 List<FileStatus> files = 439 allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList()); 440 for (FileStatus subDir : subDirs) { 441 traverseAndSetFileTime(subDir.getPath(), time); 442 } 443 for (FileStatus file : files) { 444 fs.setTimes(file.getPath(), time, -1); 445 } 446 } 447 } 448}