001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.example; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.mock; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.concurrent.CountDownLatch; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.ChoreService; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.client.ClusterConnection; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; 044import org.apache.hadoop.hbase.master.cleaner.DirScanPool; 045import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 046import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.regionserver.HStore; 049import org.apache.hadoop.hbase.regionserver.RegionServerServices; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.MiscTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.HFileArchiveUtil; 055import org.apache.hadoop.hbase.util.StoppableImplementation; 056import org.apache.hadoop.hbase.zookeeper.ZKUtil; 057import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 058import org.apache.zookeeper.KeeperException; 059import org.junit.After; 060import org.junit.AfterClass; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.mockito.Mockito; 066import org.mockito.invocation.InvocationOnMock; 067import org.mockito.stubbing.Answer; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071/** 072 * Spin up a small cluster and check that the hfiles of region are properly long-term archived as 073 * specified via the {@link ZKTableArchiveClient}. 074 */ 075@Category({MiscTests.class, MediumTests.class}) 076public class TestZooKeeperTableArchiveClient { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestZooKeeperTableArchiveClient.class); 081 082 private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperTableArchiveClient.class); 083 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); 084 private static final String STRING_TABLE_NAME = "test"; 085 private static final byte[] TEST_FAM = Bytes.toBytes("fam"); 086 private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME); 087 private static ZKTableArchiveClient archivingClient; 088 private final List<Path> toCleanup = new ArrayList<>(); 089 private static ClusterConnection CONNECTION; 090 private static RegionServerServices rss; 091 private static DirScanPool POOL; 092 093 094 /** 095 * Setup the config for the cluster 096 */ 097 @BeforeClass 098 public static void setupCluster() throws Exception { 099 setupConf(UTIL.getConfiguration()); 100 UTIL.startMiniZKCluster(); 101 CONNECTION = (ClusterConnection)ConnectionFactory.createConnection(UTIL.getConfiguration()); 102 archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION); 103 // make hfile archiving node so we can archive files 104 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 105 String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); 106 ZKUtil.createWithParents(watcher, archivingZNode); 107 rss = mock(RegionServerServices.class); 108 POOL = new DirScanPool(UTIL.getConfiguration()); 109 } 110 111 private static void setupConf(Configuration conf) { 112 // only compact with 3 files 113 conf.setInt("hbase.hstore.compaction.min", 3); 114 } 115 116 @After 117 public void tearDown() throws Exception { 118 try { 119 FileSystem fs = UTIL.getTestFileSystem(); 120 // cleanup each of the files/directories registered 121 for (Path file : toCleanup) { 122 // remove the table and archive directories 123 CommonFSUtils.delete(fs, file, true); 124 } 125 } catch (IOException e) { 126 LOG.warn("Failure to delete archive directory", e); 127 } finally { 128 toCleanup.clear(); 129 } 130 // make sure that backups are off for all tables 131 archivingClient.disableHFileBackup(); 132 } 133 134 @AfterClass 135 public static void cleanupTest() throws Exception { 136 if (CONNECTION != null) { 137 CONNECTION.close(); 138 } 139 UTIL.shutdownMiniZKCluster(); 140 if (POOL != null) { 141 POOL.shutdownNow(); 142 } 143 } 144 145 /** 146 * Test turning on/off archiving 147 */ 148 @Test 149 public void testArchivingEnableDisable() throws Exception { 150 // 1. turn on hfile backups 151 LOG.debug("----Starting archiving"); 152 archivingClient.enableHFileBackupAsync(TABLE_NAME); 153 assertTrue("Archving didn't get turned on", archivingClient 154 .getArchivingEnabled(TABLE_NAME)); 155 156 // 2. Turn off archiving and make sure its off 157 archivingClient.disableHFileBackup(); 158 assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME)); 159 160 // 3. Check enable/disable on a single table 161 archivingClient.enableHFileBackupAsync(TABLE_NAME); 162 assertTrue("Archving didn't get turned on", archivingClient 163 .getArchivingEnabled(TABLE_NAME)); 164 165 // 4. Turn off archiving and make sure its off 166 archivingClient.disableHFileBackup(TABLE_NAME); 167 assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME, 168 archivingClient.getArchivingEnabled(TABLE_NAME)); 169 } 170 171 @Test 172 public void testArchivingOnSingleTable() throws Exception { 173 createArchiveDirectory(); 174 FileSystem fs = UTIL.getTestFileSystem(); 175 Path archiveDir = getArchiveDir(); 176 Path tableDir = getTableDir(STRING_TABLE_NAME); 177 toCleanup.add(archiveDir); 178 toCleanup.add(tableDir); 179 180 Configuration conf = UTIL.getConfiguration(); 181 // setup the delegate 182 Stoppable stop = new StoppableImplementation(); 183 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 184 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 185 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 186 187 // create the region 188 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 189 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 190 List<HRegion> regions = new ArrayList<>(); 191 regions.add(region); 192 Mockito.doReturn(regions).when(rss).getRegions(); 193 final CompactedHFilesDischarger compactionCleaner = 194 new CompactedHFilesDischarger(100, stop, rss, false); 195 loadFlushAndCompact(region, TEST_FAM); 196 compactionCleaner.chore(); 197 // get the current hfiles in the archive directory 198 List<Path> files = getAllFiles(fs, archiveDir); 199 if (files == null) { 200 CommonFSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG); 201 throw new RuntimeException("Didn't archive any files!"); 202 } 203 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size()); 204 205 runCleaner(cleaner, finished, stop); 206 207 // know the cleaner ran, so now check all the files again to make sure they are still there 208 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 209 assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles); 210 211 // but we still have the archive directory 212 assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); 213 } 214 215 /** 216 * Test archiving/cleaning across multiple tables, where some are retained, and others aren't 217 * @throws Exception on failure 218 */ 219 @Test 220 public void testMultipleTables() throws Exception { 221 createArchiveDirectory(); 222 String otherTable = "otherTable"; 223 224 FileSystem fs = UTIL.getTestFileSystem(); 225 Path archiveDir = getArchiveDir(); 226 Path tableDir = getTableDir(STRING_TABLE_NAME); 227 Path otherTableDir = getTableDir(otherTable); 228 229 // register cleanup for the created directories 230 toCleanup.add(archiveDir); 231 toCleanup.add(tableDir); 232 toCleanup.add(otherTableDir); 233 Configuration conf = UTIL.getConfiguration(); 234 // setup the delegate 235 Stoppable stop = new StoppableImplementation(); 236 final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); 237 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 238 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 239 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 240 // create the region 241 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 242 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 243 List<HRegion> regions = new ArrayList<>(); 244 regions.add(region); 245 Mockito.doReturn(regions).when(rss).getRegions(); 246 final CompactedHFilesDischarger compactionCleaner = 247 new CompactedHFilesDischarger(100, stop, rss, false); 248 loadFlushAndCompact(region, TEST_FAM); 249 compactionCleaner.chore(); 250 // create the another table that we don't archive 251 hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 252 HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd); 253 regions = new ArrayList<>(); 254 regions.add(otherRegion); 255 Mockito.doReturn(regions).when(rss).getRegions(); 256 final CompactedHFilesDischarger compactionCleaner1 = new CompactedHFilesDischarger(100, stop, 257 rss, false); 258 loadFlushAndCompact(otherRegion, TEST_FAM); 259 compactionCleaner1.chore(); 260 // get the current hfiles in the archive directory 261 // Should be archived 262 List<Path> files = getAllFiles(fs, archiveDir); 263 if (files == null) { 264 CommonFSUtils.logFileSystemState(fs, archiveDir, LOG); 265 throw new RuntimeException("Didn't load archive any files!"); 266 } 267 268 // make sure we have files from both tables 269 int initialCountForPrimary = 0; 270 int initialCountForOtherTable = 0; 271 for (Path file : files) { 272 String tableName = file.getParent().getParent().getParent().getName(); 273 // check to which table this file belongs 274 if (tableName.equals(otherTable)) { 275 initialCountForOtherTable++; 276 } else if (tableName.equals(STRING_TABLE_NAME)) { 277 initialCountForPrimary++; 278 } 279 } 280 281 assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0); 282 assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0); 283 284 // run the cleaners, checking for each of the directories + files (both should be deleted and 285 // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table' 286 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3); 287 // run the cleaner 288 choreService.scheduleChore(cleaner); 289 // wait for the cleaner to check all the files 290 finished.await(); 291 // stop the cleaner 292 stop.stop(""); 293 294 // know the cleaner ran, so now check all the files again to make sure they are still there 295 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 296 int archivedForPrimary = 0; 297 for(Path file: archivedFiles) { 298 String tableName = file.getParent().getParent().getParent().getName(); 299 // ensure we don't have files from the non-archived table 300 assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable)); 301 if (tableName.equals(STRING_TABLE_NAME)) { 302 archivedForPrimary++; 303 } 304 } 305 306 assertEquals("Not all archived files for the primary table were retained.", 307 initialCountForPrimary, archivedForPrimary); 308 309 // but we still have the archive directory 310 assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir)); 311 } 312 313 314 private void createArchiveDirectory() throws IOException { 315 //create the archive and test directory 316 FileSystem fs = UTIL.getTestFileSystem(); 317 Path archiveDir = getArchiveDir(); 318 fs.mkdirs(archiveDir); 319 } 320 321 private Path getArchiveDir() throws IOException { 322 return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY); 323 } 324 325 private Path getTableDir(String tableName) throws IOException { 326 Path testDataDir = UTIL.getDataTestDir(); 327 CommonFSUtils.setRootDir(UTIL.getConfiguration(), testDataDir); 328 return new Path(testDataDir, tableName); 329 } 330 331 private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir, 332 Stoppable stop) { 333 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, 334 LongTermArchivingHFileCleaner.class.getCanonicalName()); 335 return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL); 336 } 337 338 /** 339 * Start archiving table for given hfile cleaner 340 * @param tableName table to archive 341 * @param cleaner cleaner to check to make sure change propagated 342 * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving 343 * @throws IOException on failure 344 * @throws KeeperException on failure 345 */ 346 @SuppressWarnings("checkstyle:EmptyBlock") 347 private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner) 348 throws IOException, KeeperException { 349 // turn on hfile retention 350 LOG.debug("----Starting archiving for table:" + tableName); 351 archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName)); 352 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName)); 353 354 // wait for the archiver to get the notification 355 List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting(); 356 LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 357 while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) { 358 // spin until propagation - should be fast 359 } 360 return cleaners; 361 } 362 363 /** 364 * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has 365 * seen all the files 366 * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at 367 * least the expected number of times. 368 */ 369 private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner, 370 List<BaseHFileCleanerDelegate> cleaners, final int expected) { 371 // replace the cleaner with one that we can can check 372 BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner); 373 final int[] counter = new int[] { 0 }; 374 final CountDownLatch finished = new CountDownLatch(1); 375 Mockito.doAnswer(new Answer<Iterable<FileStatus>>() { 376 377 @Override 378 public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable { 379 counter[0]++; 380 LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: " 381 + invocation.getArgument(0)); 382 383 @SuppressWarnings("unchecked") 384 Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod(); 385 if (counter[0] >= expected) { 386 finished.countDown(); 387 } 388 389 return ret; 390 } 391 }).when(delegateSpy).getDeletableFiles(Mockito.anyListOf(FileStatus.class)); 392 cleaners.set(0, delegateSpy); 393 394 return finished; 395 } 396 397 /** 398 * Get all the files (non-directory entries) in the file system under the passed directory 399 * @param dir directory to investigate 400 * @return all files under the directory 401 */ 402 private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException { 403 FileStatus[] files = CommonFSUtils.listStatus(fs, dir, null); 404 if (files == null) { 405 LOG.warn("No files under:" + dir); 406 return null; 407 } 408 409 List<Path> allFiles = new ArrayList<>(); 410 for (FileStatus file : files) { 411 if (file.isDirectory()) { 412 List<Path> subFiles = getAllFiles(fs, file.getPath()); 413 414 if (subFiles != null) { 415 allFiles.addAll(subFiles); 416 } 417 418 continue; 419 } 420 allFiles.add(file.getPath()); 421 } 422 return allFiles; 423 } 424 425 private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException { 426 // create two hfiles in the region 427 createHFileInRegion(region, family); 428 createHFileInRegion(region, family); 429 430 HStore s = region.getStore(family); 431 int count = s.getStorefilesCount(); 432 assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count, 433 count >= 2); 434 435 // compact the two files into one file to get files in the archive 436 LOG.debug("Compacting stores"); 437 region.compact(true); 438 } 439 440 /** 441 * Create a new hfile in the passed region 442 * @param region region to operate on 443 * @param columnFamily family for which to add data 444 * @throws IOException if doing the put or flush fails 445 */ 446 private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException { 447 // put one row in the region 448 Put p = new Put(Bytes.toBytes("row")); 449 p.addColumn(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1")); 450 region.put(p); 451 // flush the region to make a store file 452 region.flush(true); 453 } 454 455 /** 456 * @param cleaner the cleaner to use 457 */ 458 private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop) 459 throws InterruptedException { 460 final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME"); 461 // run the cleaner 462 choreService.scheduleChore(cleaner); 463 // wait for the cleaner to check all the files 464 finished.await(); 465 // stop the cleaner 466 stop.stop(""); 467 } 468}