001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.example;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.mockito.Mockito.mock;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.CountDownLatch;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.ChoreService;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.Stoppable;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.ConnectionRegistry;
043import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
046import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
047import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
048import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
049import org.apache.hadoop.hbase.regionserver.HRegion;
050import org.apache.hadoop.hbase.regionserver.HStore;
051import org.apache.hadoop.hbase.regionserver.RegionServerServices;
052import org.apache.hadoop.hbase.security.User;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.testclassification.MiscTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.HFileArchiveUtil;
058import org.apache.hadoop.hbase.util.StoppableImplementation;
059import org.apache.hadoop.hbase.zookeeper.ZKUtil;
060import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
061import org.apache.zookeeper.KeeperException;
062import org.junit.jupiter.api.AfterAll;
063import org.junit.jupiter.api.AfterEach;
064import org.junit.jupiter.api.BeforeAll;
065import org.junit.jupiter.api.Tag;
066import org.junit.jupiter.api.Test;
067import org.mockito.Mockito;
068import org.mockito.invocation.InvocationOnMock;
069import org.mockito.stubbing.Answer;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073/**
074 * Spin up a small cluster and check that the hfiles of region are properly long-term archived as
075 * specified via the {@link ZKTableArchiveClient}.
076 */
077@Tag(MiscTests.TAG)
078@Tag(MediumTests.TAG)
079public class TestZooKeeperTableArchiveClient {
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperTableArchiveClient.class);
082  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
083  private static final String STRING_TABLE_NAME = "test";
084  private static final byte[] TEST_FAM = Bytes.toBytes("fam");
085  private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
086  private static ZKTableArchiveClient archivingClient;
087  private final List<Path> toCleanup = new ArrayList<>();
088  private static Connection CONNECTION;
089  private static RegionServerServices rss;
090  private static DirScanPool POOL;
091
092  public static final class MockRegistry extends DoNothingConnectionRegistry {
093
094    public MockRegistry(Configuration conf, User user) {
095      super(conf, user);
096    }
097
098    @Override
099    public CompletableFuture<String> getClusterId() {
100      return CompletableFuture.completedFuture("clusterId");
101    }
102  }
103
104  /**
105   * Setup the config for the cluster
106   */
107  @BeforeAll
108  public static void setupCluster() throws Exception {
109    setupConf(UTIL.getConfiguration());
110    UTIL.startMiniZKCluster();
111    UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
112      MockRegistry.class, ConnectionRegistry.class);
113    CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
114    archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION);
115    // make hfile archiving node so we can archive files
116    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
117    String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
118    ZKUtil.createWithParents(watcher, archivingZNode);
119    rss = mock(RegionServerServices.class);
120    POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
121  }
122
123  private static void setupConf(Configuration conf) {
124    // only compact with 3 files
125    conf.setInt("hbase.hstore.compaction.min", 3);
126  }
127
128  @AfterEach
129  public void tearDown() throws Exception {
130    try {
131      FileSystem fs = UTIL.getTestFileSystem();
132      // cleanup each of the files/directories registered
133      for (Path file : toCleanup) {
134        // remove the table and archive directories
135        CommonFSUtils.delete(fs, file, true);
136      }
137    } catch (IOException e) {
138      LOG.warn("Failure to delete archive directory", e);
139    } finally {
140      toCleanup.clear();
141    }
142    // make sure that backups are off for all tables
143    archivingClient.disableHFileBackup();
144  }
145
146  @AfterAll
147  public static void cleanupTest() throws Exception {
148    if (CONNECTION != null) {
149      CONNECTION.close();
150    }
151    UTIL.shutdownMiniZKCluster();
152    if (POOL != null) {
153      POOL.shutdownNow();
154    }
155  }
156
157  /**
158   * Test turning on/off archiving
159   */
160  @Test
161  public void testArchivingEnableDisable() throws Exception {
162    // 1. turn on hfile backups
163    LOG.debug("----Starting archiving");
164    archivingClient.enableHFileBackupAsync(TABLE_NAME);
165    assertTrue(archivingClient.getArchivingEnabled(TABLE_NAME), "Archiving didn't get turned on");
166
167    // 2. Turn off archiving and make sure its off
168    archivingClient.disableHFileBackup();
169    assertFalse(archivingClient.getArchivingEnabled(TABLE_NAME),
170      "Archiving didn't get turned off.");
171
172    // 3. Check enable/disable on a single table
173    archivingClient.enableHFileBackupAsync(TABLE_NAME);
174    assertTrue(archivingClient.getArchivingEnabled(TABLE_NAME), "Archiving didn't get turned on");
175
176    // 4. Turn off archiving and make sure its off
177    archivingClient.disableHFileBackup(TABLE_NAME);
178    assertFalse(archivingClient.getArchivingEnabled(TABLE_NAME),
179      "Archiving didn't get turned off for " + STRING_TABLE_NAME);
180  }
181
182  @Test
183  public void testArchivingOnSingleTable() throws Exception {
184    createArchiveDirectory();
185    FileSystem fs = UTIL.getTestFileSystem();
186    Path archiveDir = getArchiveDir();
187    Path tableDir = getTableDir(STRING_TABLE_NAME);
188    toCleanup.add(archiveDir);
189    toCleanup.add(tableDir);
190
191    Configuration conf = UTIL.getConfiguration();
192    // setup the delegate
193    Stoppable stop = new StoppableImplementation();
194    HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
195    List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
196    final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
197
198    // create the region
199    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
200    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
201    List<HRegion> regions = new ArrayList<>();
202    regions.add(region);
203    Mockito.doReturn(regions).when(rss).getRegions();
204    final CompactedHFilesDischarger compactionCleaner =
205      new CompactedHFilesDischarger(100, stop, rss, false);
206    loadFlushAndCompact(region, TEST_FAM);
207    compactionCleaner.chore();
208    // get the current hfiles in the archive directory
209    List<Path> files = getAllFiles(fs, archiveDir);
210    if (files == null) {
211      CommonFSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG);
212      throw new RuntimeException("Didn't archive any files!");
213    }
214    CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size());
215
216    runCleaner(cleaner, finished, stop);
217
218    // know the cleaner ran, so now check all the files again to make sure they are still there
219    List<Path> archivedFiles = getAllFiles(fs, archiveDir);
220    assertEquals(files, archivedFiles, "Archived files changed after running archive cleaner.");
221
222    // but we still have the archive directory
223    assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
224  }
225
226  /**
227   * Test archiving/cleaning across multiple tables, where some are retained, and others aren't
228   * @throws Exception on failure
229   */
230  @Test
231  public void testMultipleTables() throws Exception {
232    createArchiveDirectory();
233    String otherTable = "otherTable";
234
235    FileSystem fs = UTIL.getTestFileSystem();
236    Path archiveDir = getArchiveDir();
237    Path tableDir = getTableDir(STRING_TABLE_NAME);
238    Path otherTableDir = getTableDir(otherTable);
239
240    // register cleanup for the created directories
241    toCleanup.add(archiveDir);
242    toCleanup.add(tableDir);
243    toCleanup.add(otherTableDir);
244    Configuration conf = UTIL.getConfiguration();
245    // setup the delegate
246    Stoppable stop = new StoppableImplementation();
247    final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
248    HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
249    List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
250    final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
251    // create the region
252    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
253    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
254    List<HRegion> regions = new ArrayList<>();
255    regions.add(region);
256    Mockito.doReturn(regions).when(rss).getRegions();
257    final CompactedHFilesDischarger compactionCleaner =
258      new CompactedHFilesDischarger(100, stop, rss, false);
259    loadFlushAndCompact(region, TEST_FAM);
260    compactionCleaner.chore();
261    // create the another table that we don't archive
262    hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
263    HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
264    regions = new ArrayList<>();
265    regions.add(otherRegion);
266    Mockito.doReturn(regions).when(rss).getRegions();
267    final CompactedHFilesDischarger compactionCleaner1 =
268      new CompactedHFilesDischarger(100, stop, rss, false);
269    loadFlushAndCompact(otherRegion, TEST_FAM);
270    compactionCleaner1.chore();
271    // get the current hfiles in the archive directory
272    // Should be archived
273    List<Path> files = getAllFiles(fs, archiveDir);
274    if (files == null) {
275      CommonFSUtils.logFileSystemState(fs, archiveDir, LOG);
276      throw new RuntimeException("Didn't load archive any files!");
277    }
278
279    // make sure we have files from both tables
280    int initialCountForPrimary = 0;
281    int initialCountForOtherTable = 0;
282    for (Path file : files) {
283      String tableName = file.getParent().getParent().getParent().getName();
284      // check to which table this file belongs
285      if (tableName.equals(otherTable)) {
286        initialCountForOtherTable++;
287      } else if (tableName.equals(STRING_TABLE_NAME)) {
288        initialCountForPrimary++;
289      }
290    }
291
292    assertTrue(initialCountForPrimary > 0, "Didn't archive files for:" + STRING_TABLE_NAME);
293    assertTrue(initialCountForOtherTable > 0, "Didn't archive files for:" + otherTable);
294
295    // run the cleaners, checking for each of the directories + files (both should be deleted and
296    // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table'
297    CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3);
298    // run the cleaner
299    choreService.scheduleChore(cleaner);
300    // wait for the cleaner to check all the files
301    finished.await();
302    // stop the cleaner
303    stop.stop("");
304
305    // know the cleaner ran, so now check all the files again to make sure they are still there
306    List<Path> archivedFiles = getAllFiles(fs, archiveDir);
307    int archivedForPrimary = 0;
308    for (Path file : archivedFiles) {
309      String tableName = file.getParent().getParent().getParent().getName();
310      // ensure we don't have files from the non-archived table
311      assertFalse(tableName.equals(otherTable), "Have a file from the non-archived table: " + file);
312      if (tableName.equals(STRING_TABLE_NAME)) {
313        archivedForPrimary++;
314      }
315    }
316
317    assertEquals(initialCountForPrimary, archivedForPrimary,
318      "Not all archived files for the primary table were retained.");
319
320    // but we still have the archive directory
321    assertTrue(fs.exists(archiveDir), "Archive directory was deleted via archiver");
322  }
323
324  private void createArchiveDirectory() throws IOException {
325    // create the archive and test directory
326    FileSystem fs = UTIL.getTestFileSystem();
327    Path archiveDir = getArchiveDir();
328    fs.mkdirs(archiveDir);
329  }
330
331  private Path getArchiveDir() throws IOException {
332    return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY);
333  }
334
335  private Path getTableDir(String tableName) throws IOException {
336    Path testDataDir = UTIL.getDataTestDir();
337    CommonFSUtils.setRootDir(UTIL.getConfiguration(), testDataDir);
338    return new Path(testDataDir, tableName);
339  }
340
341  private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir,
342    Stoppable stop) {
343    conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
344      LongTermArchivingHFileCleaner.class.getCanonicalName());
345    return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL);
346  }
347
348  /**
349   * Start archiving table for given hfile cleaner
350   * @param tableName table to archive
351   * @param cleaner   cleaner to check to make sure change propagated
352   * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving
353   * @throws IOException     on failure
354   * @throws KeeperException on failure
355   */
356  @SuppressWarnings("checkstyle:EmptyBlock")
357  private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner)
358    throws IOException, KeeperException {
359    // turn on hfile retention
360    LOG.debug("----Starting archiving for table:" + tableName);
361    archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName));
362    assertTrue(archivingClient.getArchivingEnabled(tableName), "Archiving didn't get turned on");
363
364    // wait for the archiver to get the notification
365    List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting();
366    LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
367    while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) {
368      // spin until propagation - should be fast
369    }
370    return cleaners;
371  }
372
373  /**
374   * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has
375   * seen all the files
376   * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at
377   *         least the expected number of times.
378   */
379  private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner,
380    List<BaseHFileCleanerDelegate> cleaners, final int expected) {
381    // replace the cleaner with one that we can can check
382    BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner);
383    final int[] counter = new int[] { 0 };
384    final CountDownLatch finished = new CountDownLatch(1);
385    Mockito.doAnswer(new Answer<Iterable<FileStatus>>() {
386
387      @Override
388      public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable {
389        counter[0]++;
390        LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: "
391          + invocation.getArgument(0));
392
393        @SuppressWarnings("unchecked")
394        Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod();
395        if (counter[0] >= expected) {
396          finished.countDown();
397        }
398
399        return ret;
400      }
401    }).when(delegateSpy).getDeletableFiles(Mockito.anyList());
402    cleaners.set(0, delegateSpy);
403
404    return finished;
405  }
406
407  /**
408   * Get all the files (non-directory entries) in the file system under the passed directory
409   * @param dir directory to investigate
410   * @return all files under the directory
411   */
412  private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException {
413    FileStatus[] files = CommonFSUtils.listStatus(fs, dir, null);
414    if (files == null) {
415      LOG.warn("No files under:" + dir);
416      return null;
417    }
418
419    List<Path> allFiles = new ArrayList<>();
420    for (FileStatus file : files) {
421      if (file.isDirectory()) {
422        List<Path> subFiles = getAllFiles(fs, file.getPath());
423
424        if (subFiles != null) {
425          allFiles.addAll(subFiles);
426        }
427
428        continue;
429      }
430      allFiles.add(file.getPath());
431    }
432    return allFiles;
433  }
434
435  private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException {
436    // create two hfiles in the region
437    createHFileInRegion(region, family);
438    createHFileInRegion(region, family);
439
440    HStore s = region.getStore(family);
441    int count = s.getStorefilesCount();
442    assertTrue(count >= 2,
443      "Don't have the expected store files, wanted >= 2 store files, but was:" + count);
444
445    // compact the two files into one file to get files in the archive
446    LOG.debug("Compacting stores");
447    region.compact(true);
448  }
449
450  /**
451   * Create a new hfile in the passed region
452   * @param region       region to operate on
453   * @param columnFamily family for which to add data
454   * @throws IOException if doing the put or flush fails
455   */
456  private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException {
457    // put one row in the region
458    Put p = new Put(Bytes.toBytes("row"));
459    p.addColumn(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1"));
460    region.put(p);
461    // flush the region to make a store file
462    region.flush(true);
463  }
464
465  /**
466   * @param cleaner the cleaner to use
467   */
468  private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop)
469    throws InterruptedException {
470    final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME");
471    // run the cleaner
472    choreService.scheduleChore(cleaner);
473    // wait for the cleaner to check all the files
474    finished.await();
475    // stop the cleaner
476    stop.stop("");
477  }
478}