001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.example; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.mock; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.concurrent.CountDownLatch; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.ChoreService; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.client.ClusterConnection; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; 044import org.apache.hadoop.hbase.master.cleaner.DirScanPool; 045import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 046import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.regionserver.HStore; 049import org.apache.hadoop.hbase.regionserver.RegionServerServices; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.MiscTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.HFileArchiveUtil; 055import org.apache.hadoop.hbase.util.StoppableImplementation; 056import org.apache.hadoop.hbase.zookeeper.ZKUtil; 057import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 058import org.apache.zookeeper.KeeperException; 059import org.junit.After; 060import org.junit.AfterClass; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.mockito.Mockito; 066import org.mockito.invocation.InvocationOnMock; 067import org.mockito.stubbing.Answer; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071/** 072 * Spin up a small cluster and check that the hfiles of region are properly long-term archived as 073 * specified via the {@link ZKTableArchiveClient}. 074 */ 075@Category({ MiscTests.class, MediumTests.class }) 076public class TestZooKeeperTableArchiveClient { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestZooKeeperTableArchiveClient.class); 081 082 private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperTableArchiveClient.class); 083 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU(); 084 private static final String STRING_TABLE_NAME = "test"; 085 private static final byte[] TEST_FAM = Bytes.toBytes("fam"); 086 private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME); 087 private static ZKTableArchiveClient archivingClient; 088 private final List<Path> toCleanup = new ArrayList<>(); 089 private static ClusterConnection CONNECTION; 090 private static RegionServerServices rss; 091 private static DirScanPool POOL; 092 093 /** 094 * Setup the config for the cluster 095 */ 096 @BeforeClass 097 public static void setupCluster() throws Exception { 098 setupConf(UTIL.getConfiguration()); 099 UTIL.startMiniZKCluster(); 100 CONNECTION = (ClusterConnection) ConnectionFactory.createConnection(UTIL.getConfiguration()); 101 archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION); 102 // make hfile archiving node so we can archive files 103 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 104 String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); 105 ZKUtil.createWithParents(watcher, archivingZNode); 106 rss = mock(RegionServerServices.class); 107 POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); 108 } 109 110 private static void setupConf(Configuration conf) { 111 // only compact with 3 files 112 conf.setInt("hbase.hstore.compaction.min", 3); 113 } 114 115 @After 116 public void tearDown() throws Exception { 117 try { 118 FileSystem fs = UTIL.getTestFileSystem(); 119 // cleanup each of the files/directories registered 120 for (Path file : toCleanup) { 121 // remove the table and archive directories 122 CommonFSUtils.delete(fs, file, true); 123 } 124 } catch (IOException e) { 125 LOG.warn("Failure to delete archive directory", e); 126 } finally { 127 toCleanup.clear(); 128 } 129 // make sure that backups are off for all tables 130 archivingClient.disableHFileBackup(); 131 } 132 133 @AfterClass 134 public static void cleanupTest() throws Exception { 135 if (CONNECTION != null) { 136 CONNECTION.close(); 137 } 138 UTIL.shutdownMiniZKCluster(); 139 if (POOL != null) { 140 POOL.shutdownNow(); 141 } 142 } 143 144 /** 145 * Test turning on/off archiving 146 */ 147 @Test 148 public void testArchivingEnableDisable() throws Exception { 149 // 1. turn on hfile backups 150 LOG.debug("----Starting archiving"); 151 archivingClient.enableHFileBackupAsync(TABLE_NAME); 152 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME)); 153 154 // 2. Turn off archiving and make sure its off 155 archivingClient.disableHFileBackup(); 156 assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME)); 157 158 // 3. Check enable/disable on a single table 159 archivingClient.enableHFileBackupAsync(TABLE_NAME); 160 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME)); 161 162 // 4. Turn off archiving and make sure its off 163 archivingClient.disableHFileBackup(TABLE_NAME); 164 assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME, 165 archivingClient.getArchivingEnabled(TABLE_NAME)); 166 } 167 168 @Test 169 public void testArchivingOnSingleTable() throws Exception { 170 createArchiveDirectory(); 171 FileSystem fs = UTIL.getTestFileSystem(); 172 Path archiveDir = getArchiveDir(); 173 Path tableDir = getTableDir(STRING_TABLE_NAME); 174 toCleanup.add(archiveDir); 175 toCleanup.add(tableDir); 176 177 Configuration conf = UTIL.getConfiguration(); 178 // setup the delegate 179 Stoppable stop = new StoppableImplementation(); 180 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 181 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 182 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 183 184 // create the region 185 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 186 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 187 List<HRegion> regions = new ArrayList<>(); 188 regions.add(region); 189 Mockito.doReturn(regions).when(rss).getRegions(); 190 final CompactedHFilesDischarger compactionCleaner = 191 new CompactedHFilesDischarger(100, stop, rss, false); 192 loadFlushAndCompact(region, TEST_FAM); 193 compactionCleaner.chore(); 194 // get the current hfiles in the archive directory 195 List<Path> files = getAllFiles(fs, archiveDir); 196 if (files == null) { 197 CommonFSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG); 198 throw new RuntimeException("Didn't archive any files!"); 199 } 200 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size()); 201 202 runCleaner(cleaner, finished, stop); 203 204 // know the cleaner ran, so now check all the files again to make sure they are still there 205 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 206 assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles); 207 208 // but we still have the archive directory 209 assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); 210 } 211 212 /** 213 * Test archiving/cleaning across multiple tables, where some are retained, and others aren't 214 * @throws Exception on failure 215 */ 216 @Test 217 public void testMultipleTables() throws Exception { 218 createArchiveDirectory(); 219 String otherTable = "otherTable"; 220 221 FileSystem fs = UTIL.getTestFileSystem(); 222 Path archiveDir = getArchiveDir(); 223 Path tableDir = getTableDir(STRING_TABLE_NAME); 224 Path otherTableDir = getTableDir(otherTable); 225 226 // register cleanup for the created directories 227 toCleanup.add(archiveDir); 228 toCleanup.add(tableDir); 229 toCleanup.add(otherTableDir); 230 Configuration conf = UTIL.getConfiguration(); 231 // setup the delegate 232 Stoppable stop = new StoppableImplementation(); 233 final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); 234 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 235 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 236 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 237 // create the region 238 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 239 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 240 List<HRegion> regions = new ArrayList<>(); 241 regions.add(region); 242 Mockito.doReturn(regions).when(rss).getRegions(); 243 final CompactedHFilesDischarger compactionCleaner = 244 new CompactedHFilesDischarger(100, stop, rss, false); 245 loadFlushAndCompact(region, TEST_FAM); 246 compactionCleaner.chore(); 247 // create the another table that we don't archive 248 hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 249 HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd); 250 regions = new ArrayList<>(); 251 regions.add(otherRegion); 252 Mockito.doReturn(regions).when(rss).getRegions(); 253 final CompactedHFilesDischarger compactionCleaner1 = 254 new CompactedHFilesDischarger(100, stop, rss, false); 255 loadFlushAndCompact(otherRegion, TEST_FAM); 256 compactionCleaner1.chore(); 257 // get the current hfiles in the archive directory 258 // Should be archived 259 List<Path> files = getAllFiles(fs, archiveDir); 260 if (files == null) { 261 CommonFSUtils.logFileSystemState(fs, archiveDir, LOG); 262 throw new RuntimeException("Didn't load archive any files!"); 263 } 264 265 // make sure we have files from both tables 266 int initialCountForPrimary = 0; 267 int initialCountForOtherTable = 0; 268 for (Path file : files) { 269 String tableName = file.getParent().getParent().getParent().getName(); 270 // check to which table this file belongs 271 if (tableName.equals(otherTable)) { 272 initialCountForOtherTable++; 273 } else if (tableName.equals(STRING_TABLE_NAME)) { 274 initialCountForPrimary++; 275 } 276 } 277 278 assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0); 279 assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0); 280 281 // run the cleaners, checking for each of the directories + files (both should be deleted and 282 // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table' 283 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3); 284 // run the cleaner 285 choreService.scheduleChore(cleaner); 286 // wait for the cleaner to check all the files 287 finished.await(); 288 // stop the cleaner 289 stop.stop(""); 290 291 // know the cleaner ran, so now check all the files again to make sure they are still there 292 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 293 int archivedForPrimary = 0; 294 for (Path file : archivedFiles) { 295 String tableName = file.getParent().getParent().getParent().getName(); 296 // ensure we don't have files from the non-archived table 297 assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable)); 298 if (tableName.equals(STRING_TABLE_NAME)) { 299 archivedForPrimary++; 300 } 301 } 302 303 assertEquals("Not all archived files for the primary table were retained.", 304 initialCountForPrimary, archivedForPrimary); 305 306 // but we still have the archive directory 307 assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir)); 308 } 309 310 private void createArchiveDirectory() throws IOException { 311 // create the archive and test directory 312 FileSystem fs = UTIL.getTestFileSystem(); 313 Path archiveDir = getArchiveDir(); 314 fs.mkdirs(archiveDir); 315 } 316 317 private Path getArchiveDir() throws IOException { 318 return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY); 319 } 320 321 private Path getTableDir(String tableName) throws IOException { 322 Path testDataDir = UTIL.getDataTestDir(); 323 CommonFSUtils.setRootDir(UTIL.getConfiguration(), testDataDir); 324 return new Path(testDataDir, tableName); 325 } 326 327 private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir, 328 Stoppable stop) { 329 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, 330 LongTermArchivingHFileCleaner.class.getCanonicalName()); 331 return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL); 332 } 333 334 /** 335 * Start archiving table for given hfile cleaner 336 * @param tableName table to archive 337 * @param cleaner cleaner to check to make sure change propagated 338 * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving 339 * @throws IOException on failure 340 * @throws KeeperException on failure 341 */ 342 @SuppressWarnings("checkstyle:EmptyBlock") 343 private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner) 344 throws IOException, KeeperException { 345 // turn on hfile retention 346 LOG.debug("----Starting archiving for table:" + tableName); 347 archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName)); 348 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName)); 349 350 // wait for the archiver to get the notification 351 List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting(); 352 LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 353 while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) { 354 // spin until propagation - should be fast 355 } 356 return cleaners; 357 } 358 359 /** 360 * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has 361 * seen all the files 362 * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at 363 * least the expected number of times. 364 */ 365 private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner, 366 List<BaseHFileCleanerDelegate> cleaners, final int expected) { 367 // replace the cleaner with one that we can can check 368 BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner); 369 final int[] counter = new int[] { 0 }; 370 final CountDownLatch finished = new CountDownLatch(1); 371 Mockito.doAnswer(new Answer<Iterable<FileStatus>>() { 372 373 @Override 374 public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable { 375 counter[0]++; 376 LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: " 377 + invocation.getArgument(0)); 378 379 @SuppressWarnings("unchecked") 380 Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod(); 381 if (counter[0] >= expected) { 382 finished.countDown(); 383 } 384 385 return ret; 386 } 387 }).when(delegateSpy).getDeletableFiles(Mockito.anyListOf(FileStatus.class)); 388 cleaners.set(0, delegateSpy); 389 390 return finished; 391 } 392 393 /** 394 * Get all the files (non-directory entries) in the file system under the passed directory 395 * @param dir directory to investigate 396 * @return all files under the directory 397 */ 398 private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException { 399 FileStatus[] files = CommonFSUtils.listStatus(fs, dir, null); 400 if (files == null) { 401 LOG.warn("No files under:" + dir); 402 return null; 403 } 404 405 List<Path> allFiles = new ArrayList<>(); 406 for (FileStatus file : files) { 407 if (file.isDirectory()) { 408 List<Path> subFiles = getAllFiles(fs, file.getPath()); 409 410 if (subFiles != null) { 411 allFiles.addAll(subFiles); 412 } 413 414 continue; 415 } 416 allFiles.add(file.getPath()); 417 } 418 return allFiles; 419 } 420 421 private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException { 422 // create two hfiles in the region 423 createHFileInRegion(region, family); 424 createHFileInRegion(region, family); 425 426 HStore s = region.getStore(family); 427 int count = s.getStorefilesCount(); 428 assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count, 429 count >= 2); 430 431 // compact the two files into one file to get files in the archive 432 LOG.debug("Compacting stores"); 433 region.compact(true); 434 } 435 436 /** 437 * Create a new hfile in the passed region 438 * @param region region to operate on 439 * @param columnFamily family for which to add data 440 * @throws IOException if doing the put or flush fails 441 */ 442 private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException { 443 // put one row in the region 444 Put p = new Put(Bytes.toBytes("row")); 445 p.addColumn(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1")); 446 region.put(p); 447 // flush the region to make a store file 448 region.flush(true); 449 } 450 451 /** 452 * @param cleaner the cleaner to use 453 */ 454 private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop) 455 throws InterruptedException { 456 final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME"); 457 // run the cleaner 458 choreService.scheduleChore(cleaner); 459 // wait for the cleaner to check all the files 460 finished.await(); 461 // stop the cleaner 462 stop.stop(""); 463 } 464}