001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.example; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.mockito.Mockito.mock; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.CountDownLatch; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.ChoreService; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.ConnectionRegistry; 043import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; 046import org.apache.hadoop.hbase.master.cleaner.DirScanPool; 047import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 048import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; 049import org.apache.hadoop.hbase.regionserver.HRegion; 050import org.apache.hadoop.hbase.regionserver.HStore; 051import org.apache.hadoop.hbase.regionserver.RegionServerServices; 052import org.apache.hadoop.hbase.security.User; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.testclassification.MiscTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.CommonFSUtils; 057import org.apache.hadoop.hbase.util.HFileArchiveUtil; 058import org.apache.hadoop.hbase.util.StoppableImplementation; 059import org.apache.hadoop.hbase.zookeeper.ZKUtil; 060import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 061import org.apache.zookeeper.KeeperException; 062import org.junit.jupiter.api.AfterAll; 063import org.junit.jupiter.api.AfterEach; 064import org.junit.jupiter.api.BeforeAll; 065import org.junit.jupiter.api.Tag; 066import org.junit.jupiter.api.Test; 067import org.mockito.Mockito; 068import org.mockito.invocation.InvocationOnMock; 069import org.mockito.stubbing.Answer; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073/** 074 * Spin up a small cluster and check that the hfiles of region are properly long-term archived as 075 * specified via the {@link ZKTableArchiveClient}. 076 */ 077@Tag(MiscTests.TAG) 078@Tag(MediumTests.TAG) 079public class TestZooKeeperTableArchiveClient { 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperTableArchiveClient.class); 082 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 083 private static final String STRING_TABLE_NAME = "test"; 084 private static final byte[] TEST_FAM = Bytes.toBytes("fam"); 085 private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME); 086 private static ZKTableArchiveClient archivingClient; 087 private final List<Path> toCleanup = new ArrayList<>(); 088 private static Connection CONNECTION; 089 private static RegionServerServices rss; 090 private static DirScanPool POOL; 091 092 public static final class MockRegistry extends DoNothingConnectionRegistry { 093 094 public MockRegistry(Configuration conf, User user) { 095 super(conf, user); 096 } 097 098 @Override 099 public CompletableFuture<String> getClusterId() { 100 return CompletableFuture.completedFuture("clusterId"); 101 } 102 } 103 104 /** 105 * Setup the config for the cluster 106 */ 107 @BeforeAll 108 public static void setupCluster() throws Exception { 109 setupConf(UTIL.getConfiguration()); 110 UTIL.startMiniZKCluster(); 111 UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 112 MockRegistry.class, ConnectionRegistry.class); 113 CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); 114 archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION); 115 // make hfile archiving node so we can archive files 116 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 117 String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); 118 ZKUtil.createWithParents(watcher, archivingZNode); 119 rss = mock(RegionServerServices.class); 120 POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); 121 } 122 123 private static void setupConf(Configuration conf) { 124 // only compact with 3 files 125 conf.setInt("hbase.hstore.compaction.min", 3); 126 } 127 128 @AfterEach 129 public void tearDown() throws Exception { 130 try { 131 FileSystem fs = UTIL.getTestFileSystem(); 132 // cleanup each of the files/directories registered 133 for (Path file : toCleanup) { 134 // remove the table and archive directories 135 CommonFSUtils.delete(fs, file, true); 136 } 137 } catch (IOException e) { 138 LOG.warn("Failure to delete archive directory", e); 139 } finally { 140 toCleanup.clear(); 141 } 142 // make sure that backups are off for all tables 143 archivingClient.disableHFileBackup(); 144 } 145 146 @AfterAll 147 public static void cleanupTest() throws Exception { 148 if (CONNECTION != null) { 149 CONNECTION.close(); 150 } 151 UTIL.shutdownMiniZKCluster(); 152 if (POOL != null) { 153 POOL.shutdownNow(); 154 } 155 } 156 157 /** 158 * Test turning on/off archiving 159 */ 160 @Test 161 public void testArchivingEnableDisable() throws Exception { 162 // 1. turn on hfile backups 163 LOG.debug("----Starting archiving"); 164 archivingClient.enableHFileBackupAsync(TABLE_NAME); 165 assertTrue(archivingClient.getArchivingEnabled(TABLE_NAME), "Archiving didn't get turned on"); 166 167 // 2. Turn off archiving and make sure its off 168 archivingClient.disableHFileBackup(); 169 assertFalse(archivingClient.getArchivingEnabled(TABLE_NAME), 170 "Archiving didn't get turned off."); 171 172 // 3. Check enable/disable on a single table 173 archivingClient.enableHFileBackupAsync(TABLE_NAME); 174 assertTrue(archivingClient.getArchivingEnabled(TABLE_NAME), "Archiving didn't get turned on"); 175 176 // 4. Turn off archiving and make sure its off 177 archivingClient.disableHFileBackup(TABLE_NAME); 178 assertFalse(archivingClient.getArchivingEnabled(TABLE_NAME), 179 "Archiving didn't get turned off for " + STRING_TABLE_NAME); 180 } 181 182 @Test 183 public void testArchivingOnSingleTable() throws Exception { 184 createArchiveDirectory(); 185 FileSystem fs = UTIL.getTestFileSystem(); 186 Path archiveDir = getArchiveDir(); 187 Path tableDir = getTableDir(STRING_TABLE_NAME); 188 toCleanup.add(archiveDir); 189 toCleanup.add(tableDir); 190 191 Configuration conf = UTIL.getConfiguration(); 192 // setup the delegate 193 Stoppable stop = new StoppableImplementation(); 194 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 195 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 196 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 197 198 // create the region 199 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 200 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 201 List<HRegion> regions = new ArrayList<>(); 202 regions.add(region); 203 Mockito.doReturn(regions).when(rss).getRegions(); 204 final CompactedHFilesDischarger compactionCleaner = 205 new CompactedHFilesDischarger(100, stop, rss, false); 206 loadFlushAndCompact(region, TEST_FAM); 207 compactionCleaner.chore(); 208 // get the current hfiles in the archive directory 209 List<Path> files = getAllFiles(fs, archiveDir); 210 if (files == null) { 211 CommonFSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG); 212 throw new RuntimeException("Didn't archive any files!"); 213 } 214 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size()); 215 216 runCleaner(cleaner, finished, stop); 217 218 // know the cleaner ran, so now check all the files again to make sure they are still there 219 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 220 assertEquals(files, archivedFiles, "Archived files changed after running archive cleaner."); 221 222 // but we still have the archive directory 223 assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); 224 } 225 226 /** 227 * Test archiving/cleaning across multiple tables, where some are retained, and others aren't 228 * @throws Exception on failure 229 */ 230 @Test 231 public void testMultipleTables() throws Exception { 232 createArchiveDirectory(); 233 String otherTable = "otherTable"; 234 235 FileSystem fs = UTIL.getTestFileSystem(); 236 Path archiveDir = getArchiveDir(); 237 Path tableDir = getTableDir(STRING_TABLE_NAME); 238 Path otherTableDir = getTableDir(otherTable); 239 240 // register cleanup for the created directories 241 toCleanup.add(archiveDir); 242 toCleanup.add(tableDir); 243 toCleanup.add(otherTableDir); 244 Configuration conf = UTIL.getConfiguration(); 245 // setup the delegate 246 Stoppable stop = new StoppableImplementation(); 247 final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); 248 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); 249 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); 250 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 251 // create the region 252 ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 253 HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); 254 List<HRegion> regions = new ArrayList<>(); 255 regions.add(region); 256 Mockito.doReturn(regions).when(rss).getRegions(); 257 final CompactedHFilesDischarger compactionCleaner = 258 new CompactedHFilesDischarger(100, stop, rss, false); 259 loadFlushAndCompact(region, TEST_FAM); 260 compactionCleaner.chore(); 261 // create the another table that we don't archive 262 hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM); 263 HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd); 264 regions = new ArrayList<>(); 265 regions.add(otherRegion); 266 Mockito.doReturn(regions).when(rss).getRegions(); 267 final CompactedHFilesDischarger compactionCleaner1 = 268 new CompactedHFilesDischarger(100, stop, rss, false); 269 loadFlushAndCompact(otherRegion, TEST_FAM); 270 compactionCleaner1.chore(); 271 // get the current hfiles in the archive directory 272 // Should be archived 273 List<Path> files = getAllFiles(fs, archiveDir); 274 if (files == null) { 275 CommonFSUtils.logFileSystemState(fs, archiveDir, LOG); 276 throw new RuntimeException("Didn't load archive any files!"); 277 } 278 279 // make sure we have files from both tables 280 int initialCountForPrimary = 0; 281 int initialCountForOtherTable = 0; 282 for (Path file : files) { 283 String tableName = file.getParent().getParent().getParent().getName(); 284 // check to which table this file belongs 285 if (tableName.equals(otherTable)) { 286 initialCountForOtherTable++; 287 } else if (tableName.equals(STRING_TABLE_NAME)) { 288 initialCountForPrimary++; 289 } 290 } 291 292 assertTrue(initialCountForPrimary > 0, "Didn't archive files for:" + STRING_TABLE_NAME); 293 assertTrue(initialCountForOtherTable > 0, "Didn't archive files for:" + otherTable); 294 295 // run the cleaners, checking for each of the directories + files (both should be deleted and 296 // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table' 297 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3); 298 // run the cleaner 299 choreService.scheduleChore(cleaner); 300 // wait for the cleaner to check all the files 301 finished.await(); 302 // stop the cleaner 303 stop.stop(""); 304 305 // know the cleaner ran, so now check all the files again to make sure they are still there 306 List<Path> archivedFiles = getAllFiles(fs, archiveDir); 307 int archivedForPrimary = 0; 308 for (Path file : archivedFiles) { 309 String tableName = file.getParent().getParent().getParent().getName(); 310 // ensure we don't have files from the non-archived table 311 assertFalse(tableName.equals(otherTable), "Have a file from the non-archived table: " + file); 312 if (tableName.equals(STRING_TABLE_NAME)) { 313 archivedForPrimary++; 314 } 315 } 316 317 assertEquals(initialCountForPrimary, archivedForPrimary, 318 "Not all archived files for the primary table were retained."); 319 320 // but we still have the archive directory 321 assertTrue(fs.exists(archiveDir), "Archive directory was deleted via archiver"); 322 } 323 324 private void createArchiveDirectory() throws IOException { 325 // create the archive and test directory 326 FileSystem fs = UTIL.getTestFileSystem(); 327 Path archiveDir = getArchiveDir(); 328 fs.mkdirs(archiveDir); 329 } 330 331 private Path getArchiveDir() throws IOException { 332 return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY); 333 } 334 335 private Path getTableDir(String tableName) throws IOException { 336 Path testDataDir = UTIL.getDataTestDir(); 337 CommonFSUtils.setRootDir(UTIL.getConfiguration(), testDataDir); 338 return new Path(testDataDir, tableName); 339 } 340 341 private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir, 342 Stoppable stop) { 343 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, 344 LongTermArchivingHFileCleaner.class.getCanonicalName()); 345 return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL); 346 } 347 348 /** 349 * Start archiving table for given hfile cleaner 350 * @param tableName table to archive 351 * @param cleaner cleaner to check to make sure change propagated 352 * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving 353 * @throws IOException on failure 354 * @throws KeeperException on failure 355 */ 356 @SuppressWarnings("checkstyle:EmptyBlock") 357 private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner) 358 throws IOException, KeeperException { 359 // turn on hfile retention 360 LOG.debug("----Starting archiving for table:" + tableName); 361 archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName)); 362 assertTrue(archivingClient.getArchivingEnabled(tableName), "Archiving didn't get turned on"); 363 364 // wait for the archiver to get the notification 365 List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting(); 366 LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); 367 while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) { 368 // spin until propagation - should be fast 369 } 370 return cleaners; 371 } 372 373 /** 374 * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has 375 * seen all the files 376 * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at 377 * least the expected number of times. 378 */ 379 private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner, 380 List<BaseHFileCleanerDelegate> cleaners, final int expected) { 381 // replace the cleaner with one that we can can check 382 BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner); 383 final int[] counter = new int[] { 0 }; 384 final CountDownLatch finished = new CountDownLatch(1); 385 Mockito.doAnswer(new Answer<Iterable<FileStatus>>() { 386 387 @Override 388 public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable { 389 counter[0]++; 390 LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: " 391 + invocation.getArgument(0)); 392 393 @SuppressWarnings("unchecked") 394 Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod(); 395 if (counter[0] >= expected) { 396 finished.countDown(); 397 } 398 399 return ret; 400 } 401 }).when(delegateSpy).getDeletableFiles(Mockito.anyList()); 402 cleaners.set(0, delegateSpy); 403 404 return finished; 405 } 406 407 /** 408 * Get all the files (non-directory entries) in the file system under the passed directory 409 * @param dir directory to investigate 410 * @return all files under the directory 411 */ 412 private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException { 413 FileStatus[] files = CommonFSUtils.listStatus(fs, dir, null); 414 if (files == null) { 415 LOG.warn("No files under:" + dir); 416 return null; 417 } 418 419 List<Path> allFiles = new ArrayList<>(); 420 for (FileStatus file : files) { 421 if (file.isDirectory()) { 422 List<Path> subFiles = getAllFiles(fs, file.getPath()); 423 424 if (subFiles != null) { 425 allFiles.addAll(subFiles); 426 } 427 428 continue; 429 } 430 allFiles.add(file.getPath()); 431 } 432 return allFiles; 433 } 434 435 private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException { 436 // create two hfiles in the region 437 createHFileInRegion(region, family); 438 createHFileInRegion(region, family); 439 440 HStore s = region.getStore(family); 441 int count = s.getStorefilesCount(); 442 assertTrue(count >= 2, 443 "Don't have the expected store files, wanted >= 2 store files, but was:" + count); 444 445 // compact the two files into one file to get files in the archive 446 LOG.debug("Compacting stores"); 447 region.compact(true); 448 } 449 450 /** 451 * Create a new hfile in the passed region 452 * @param region region to operate on 453 * @param columnFamily family for which to add data 454 * @throws IOException if doing the put or flush fails 455 */ 456 private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException { 457 // put one row in the region 458 Put p = new Put(Bytes.toBytes("row")); 459 p.addColumn(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1")); 460 region.put(p); 461 // flush the region to make a store file 462 region.flush(true); 463 } 464 465 /** 466 * @param cleaner the cleaner to use 467 */ 468 private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop) 469 throws InterruptedException { 470 final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME"); 471 // run the cleaner 472 choreService.scheduleChore(cleaner); 473 // wait for the cleaner to check all the files 474 finished.await(); 475 // stop the cleaner 476 stop.stop(""); 477 } 478}