001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.example; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.mock; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.concurrent.CountDownLatch; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.ChoreService; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.client.ClusterConnection; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; 044import org.apache.hadoop.hbase.master.cleaner.DirScanPool; 045import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 046import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.regionserver.HStore; 049import org.apache.hadoop.hbase.regionserver.RegionServerServices; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.MiscTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.FSUtils; 054import org.apache.hadoop.hbase.util.HFileArchiveUtil; 055import org.apache.hadoop.hbase.util.StoppableImplementation; 056import org.apache.hadoop.hbase.zookeeper.ZKUtil; 057import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 058import org.apache.zookeeper.KeeperException; 059import org.junit.After; 060import org.junit.AfterClass; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.mockito.Mockito; 066import org.mockito.invocation.InvocationOnMock; 067import org.mockito.stubbing.Answer; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071/** 072 * Spin up a small cluster and check that the hfiles of region are properly long-term archived as 073 * specified via the {@link ZKTableArchiveClient}. 074 */ 075@Category({MiscTests.class, MediumTests.class}) 076public class TestZooKeeperTableArchiveClient { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestZooKeeperTableArchiveClient.class); 081 082 private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperTableArchiveClient.class); 083 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); 084 private static final String STRING_TABLE_NAME = "test"; 085 private static final byte[] TEST_FAM = Bytes.toBytes("fam"); 086 private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME); 087 private static ZKTableArchiveClient archivingClient; 088 private final List<Path> toCleanup = new ArrayList<>(); 089 private static ClusterConnection CONNECTION; 090 private static RegionServerServices rss; 091 private static DirScanPool POOL; 092 093 /** 094 * Setup the config for the cluster 095 */ 096 @BeforeClass 097 public static void setupCluster() throws Exception { 098 setupConf(UTIL.getConfiguration()); 099 UTIL.startMiniZKCluster(); 100 CONNECTION = (ClusterConnection)ConnectionFactory.createConnection(UTIL.getConfiguration()); 101 archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION); 102 // make hfile archiving node so we can archive files 103 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 104 String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); 105 ZKUtil.createWithParents(watcher, archivingZNode); 106 rss = mock(RegionServerServices.class); 107 POOL = new DirScanPool(UTIL.getConfiguration()); 108 } 109 110 private static void setupConf(Configuration conf) { 111 // only compact with 3 files 112 conf.setInt("hbase.hstore.compaction.min", 3); 113 } 114 115 @After 116 public void tearDown() throws Exception { 117 try { 118 FileSystem fs = UTIL.getTestFileSystem(); 119 // cleanup each of the files/directories registered 120 for (Path file : toCleanup) { 121 // remove the table and archive directories 122 FSUtils.delete(fs, file, true); 123 } 124 } catch (IOException e) { 125 LOG.warn("Failure to delete archive directory", e); 126 } finally { 127 toCleanup.clear(); 128 } 129 // make sure that backups are off for all tables 130 archivingClient.disableHFileBackup(); 131 } 132 133 @AfterClass 134 public static void cleanupTest() throws Exception { 135 CONNECTION.close(); 136 UTIL.shutdownMiniZKCluster(); 137 POOL.shutdownNow(); 138 } 139 140 /** 141 * Test turning on/off archiving 142 */ 143 @Test 144 public void testArchivingEnableDisable() throws Exception { 145 // 1. turn on hfile backups 146 LOG.debug("----Starting archiving"); 147 archivingClient.enableHFileBackupAsync(TABLE_NAME); 148 assertTrue("Archving didn't get turned on", archivingClient 149 .getArchivingEnabled(TABLE_NAME)); 150 151 // 2. Turn off archiving and make sure its off 152 archivingClient.disableHFileBackup(); 153 assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME)); 154 155 // 3. Check enable/disable on a single table 156 archivingClient.enableHFileBackupAsync(TABLE_NAME); 157 assertTrue("Archving didn't get turned on", archivingClient 158 .getArchivingEnabled(TABLE_NAME)); 159 160 // 4. Turn off archiving and make sure its off 161 archivingClient.disableHFileBackup(TABLE_NAME); 162 assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME, 163 archivingClient.getArchivingEnabled(TABLE_NAME)); 164 } 165 166 @Test 167 public void testArchivingOnSingleTable() throws Exception { 168 createArchiveDirectory(); 169 FileSystem fs = UTIL.getTestFileSystem(); 170 Path archiveDir = getArchiveDir(); 171 Path tableDir = getTableDir(STRING_TABLE_NAME); 172 toCleanup.add(archiveDir); 173 toCleanup.add(tableDir); 174 175 Configuration conf = UTIL.getConfiguration(); 176 // setup the delegate 177 Stoppable stop = new StoppableImplementation(); 178 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 179 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 180 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 181 182 // create the region 183 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 184 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 185 List<HRegion> regions = new ArrayList<>(); 186 regions.add(region); 187 Mockito.doReturn(regions).when(rss).getRegions(); 188 final CompactedHFilesDischarger compactionCleaner = 189 new CompactedHFilesDischarger(100, stop, rss, false); 190 loadFlushAndCompact(region, TEST_FAM); 191 compactionCleaner.chore(); 192 // get the current hfiles in the archive directory 193 List<Path> files = getAllFiles(fs, archiveDir); 194 if (files == null) { 195 FSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG); 196 throw new RuntimeException("Didn't archive any files!"); 197 } 198 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size()); 199 200 runCleaner(cleaner, finished, stop); 201 202 // know the cleaner ran, so now check all the files again to make sure they are still there 203 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 204 assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles); 205 206 // but we still have the archive directory 207 assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); 208 } 209 210 /** 211 * Test archiving/cleaning across multiple tables, where some are retained, and others aren't 212 * @throws Exception on failure 213 */ 214 @Test 215 public void testMultipleTables() throws Exception { 216 createArchiveDirectory(); 217 String otherTable = "otherTable"; 218 219 FileSystem fs = UTIL.getTestFileSystem(); 220 Path archiveDir = getArchiveDir(); 221 Path tableDir = getTableDir(STRING_TABLE_NAME); 222 Path otherTableDir = getTableDir(otherTable); 223 224 // register cleanup for the created directories 225 toCleanup.add(archiveDir); 226 toCleanup.add(tableDir); 227 toCleanup.add(otherTableDir); 228 Configuration conf = UTIL.getConfiguration(); 229 // setup the delegate 230 Stoppable stop = new StoppableImplementation(); 231 final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); 232 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 233 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 234 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 235 // create the region 236 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 237 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 238 List<HRegion> regions = new ArrayList<>(); 239 regions.add(region); 240 Mockito.doReturn(regions).when(rss).getRegions(); 241 final CompactedHFilesDischarger compactionCleaner = 242 new CompactedHFilesDischarger(100, stop, rss, false); 243 loadFlushAndCompact(region, TEST_FAM); 244 compactionCleaner.chore(); 245 // create the another table that we don't archive 246 hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 247 HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd); 248 regions = new ArrayList<>(); 249 regions.add(otherRegion); 250 Mockito.doReturn(regions).when(rss).getRegions(); 251 final CompactedHFilesDischarger compactionCleaner1 = new CompactedHFilesDischarger(100, stop, 252 rss, false); 253 loadFlushAndCompact(otherRegion, TEST_FAM); 254 compactionCleaner1.chore(); 255 // get the current hfiles in the archive directory 256 // Should be archived 257 List<Path> files = getAllFiles(fs, archiveDir); 258 if (files == null) { 259 FSUtils.logFileSystemState(fs, archiveDir, LOG); 260 throw new RuntimeException("Didn't load archive any files!"); 261 } 262 263 // make sure we have files from both tables 264 int initialCountForPrimary = 0; 265 int initialCountForOtherTable = 0; 266 for (Path file : files) { 267 String tableName = file.getParent().getParent().getParent().getName(); 268 // check to which table this file belongs 269 if (tableName.equals(otherTable)) initialCountForOtherTable++; 270 else if (tableName.equals(STRING_TABLE_NAME)) initialCountForPrimary++; 271 } 272 273 assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0); 274 assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0); 275 276 // run the cleaners, checking for each of the directories + files (both should be deleted and 277 // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table' 278 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3); 279 // run the cleaner 280 choreService.scheduleChore(cleaner); 281 // wait for the cleaner to check all the files 282 finished.await(); 283 // stop the cleaner 284 stop.stop(""); 285 286 // know the cleaner ran, so now check all the files again to make sure they are still there 287 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 288 int archivedForPrimary = 0; 289 for(Path file: archivedFiles) { 290 String tableName = file.getParent().getParent().getParent().getName(); 291 // ensure we don't have files from the non-archived table 292 assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable)); 293 if (tableName.equals(STRING_TABLE_NAME)) archivedForPrimary++; 294 } 295 296 assertEquals("Not all archived files for the primary table were retained.", initialCountForPrimary, 297 archivedForPrimary); 298 299 // but we still have the archive directory 300 assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir)); 301 } 302 303 304 private void createArchiveDirectory() throws IOException { 305 //create the archive and test directory 306 FileSystem fs = UTIL.getTestFileSystem(); 307 Path archiveDir = getArchiveDir(); 308 fs.mkdirs(archiveDir); 309 } 310 311 private Path getArchiveDir() throws IOException { 312 return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY); 313 } 314 315 private Path getTableDir(String tableName) throws IOException { 316 Path testDataDir = UTIL.getDataTestDir(); 317 FSUtils.setRootDir(UTIL.getConfiguration(), testDataDir); 318 return new Path(testDataDir, tableName); 319 } 320 321 private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir, 322 Stoppable stop) { 323 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, 324 LongTermArchivingHFileCleaner.class.getCanonicalName()); 325 return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL); 326 } 327 328 /** 329 * Start archiving table for given hfile cleaner 330 * @param tableName table to archive 331 * @param cleaner cleaner to check to make sure change propagated 332 * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving 333 * @throws IOException on failure 334 * @throws KeeperException on failure 335 */ 336 private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner) 337 throws IOException, KeeperException { 338 // turn on hfile retention 339 LOG.debug("----Starting archiving for table:" + tableName); 340 archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName)); 341 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName)); 342 343 // wait for the archiver to get the notification 344 List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting(); 345 LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 346 while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) { 347 // spin until propagation - should be fast 348 } 349 return cleaners; 350 } 351 352 /** 353 * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has 354 * seen all the files 355 * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at 356 * least the expected number of times. 357 */ 358 private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner, 359 List<BaseHFileCleanerDelegate> cleaners, final int expected) { 360 // replace the cleaner with one that we can can check 361 BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner); 362 final int[] counter = new int[] { 0 }; 363 final CountDownLatch finished = new CountDownLatch(1); 364 Mockito.doAnswer(new Answer<Iterable<FileStatus>>() { 365 366 @Override 367 public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable { 368 counter[0]++; 369 LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: " 370 + invocation.getArgument(0)); 371 372 @SuppressWarnings("unchecked") 373 Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod(); 374 if (counter[0] >= expected) finished.countDown(); 375 return ret; 376 } 377 }).when(delegateSpy).getDeletableFiles(Mockito.anyListOf(FileStatus.class)); 378 cleaners.set(0, delegateSpy); 379 380 return finished; 381 } 382 383 /** 384 * Get all the files (non-directory entries) in the file system under the passed directory 385 * @param dir directory to investigate 386 * @return all files under the directory 387 */ 388 private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException { 389 FileStatus[] files = FSUtils.listStatus(fs, dir, null); 390 if (files == null) { 391 LOG.warn("No files under:" + dir); 392 return null; 393 } 394 395 List<Path> allFiles = new ArrayList<>(); 396 for (FileStatus file : files) { 397 if (file.isDirectory()) { 398 List<Path> subFiles = getAllFiles(fs, file.getPath()); 399 if (subFiles != null) allFiles.addAll(subFiles); 400 continue; 401 } 402 allFiles.add(file.getPath()); 403 } 404 return allFiles; 405 } 406 407 private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException { 408 // create two hfiles in the region 409 createHFileInRegion(region, family); 410 createHFileInRegion(region, family); 411 412 HStore s = region.getStore(family); 413 int count = s.getStorefilesCount(); 414 assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count, 415 count >= 2); 416 417 // compact the two files into one file to get files in the archive 418 LOG.debug("Compacting stores"); 419 region.compact(true); 420 } 421 422 /** 423 * Create a new hfile in the passed region 424 * @param region region to operate on 425 * @param columnFamily family for which to add data 426 * @throws IOException 427 */ 428 private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException { 429 // put one row in the region 430 Put p = new Put(Bytes.toBytes("row")); 431 p.addColumn(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1")); 432 region.put(p); 433 // flush the region to make a store file 434 region.flush(true); 435 } 436 437 /** 438 * @param cleaner 439 */ 440 private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop) 441 throws InterruptedException { 442 final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME"); 443 // run the cleaner 444 choreService.scheduleChore(cleaner); 445 // wait for the cleaner to check all the files 446 finished.await(); 447 // stop the cleaner 448 stop.stop(""); 449 } 450}