001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.example;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.mock;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.CountDownLatch;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.ChoreService;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.Stoppable;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.ConnectionRegistry;
044import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
047import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
048import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
049import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
050import org.apache.hadoop.hbase.regionserver.HRegion;
051import org.apache.hadoop.hbase.regionserver.HStore;
052import org.apache.hadoop.hbase.regionserver.RegionServerServices;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.testclassification.MiscTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.CommonFSUtils;
058import org.apache.hadoop.hbase.util.HFileArchiveUtil;
059import org.apache.hadoop.hbase.util.StoppableImplementation;
060import org.apache.hadoop.hbase.zookeeper.ZKUtil;
061import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
062import org.apache.zookeeper.KeeperException;
063import org.junit.After;
064import org.junit.AfterClass;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.mockito.Mockito;
070import org.mockito.invocation.InvocationOnMock;
071import org.mockito.stubbing.Answer;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075/**
076 * Spin up a small cluster and check that the hfiles of region are properly long-term archived as
077 * specified via the {@link ZKTableArchiveClient}.
078 */
079@Category({ MiscTests.class, MediumTests.class })
080public class TestZooKeeperTableArchiveClient {
081
082  @ClassRule
083  public static final HBaseClassTestRule CLASS_RULE =
084    HBaseClassTestRule.forClass(TestZooKeeperTableArchiveClient.class);
085
086  private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperTableArchiveClient.class);
087  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
088  private static final String STRING_TABLE_NAME = "test";
089  private static final byte[] TEST_FAM = Bytes.toBytes("fam");
090  private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
091  private static ZKTableArchiveClient archivingClient;
092  private final List<Path> toCleanup = new ArrayList<>();
093  private static Connection CONNECTION;
094  private static RegionServerServices rss;
095  private static DirScanPool POOL;
096
097  public static final class MockRegistry extends DoNothingConnectionRegistry {
098
099    public MockRegistry(Configuration conf, User user) {
100      super(conf, user);
101    }
102
103    @Override
104    public CompletableFuture<String> getClusterId() {
105      return CompletableFuture.completedFuture("clusterId");
106    }
107  }
108
109  /**
110   * Setup the config for the cluster
111   */
112  @BeforeClass
113  public static void setupCluster() throws Exception {
114    setupConf(UTIL.getConfiguration());
115    UTIL.startMiniZKCluster();
116    UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
117      MockRegistry.class, ConnectionRegistry.class);
118    CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
119    archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION);
120    // make hfile archiving node so we can archive files
121    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
122    String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
123    ZKUtil.createWithParents(watcher, archivingZNode);
124    rss = mock(RegionServerServices.class);
125    POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
126  }
127
128  private static void setupConf(Configuration conf) {
129    // only compact with 3 files
130    conf.setInt("hbase.hstore.compaction.min", 3);
131  }
132
133  @After
134  public void tearDown() throws Exception {
135    try {
136      FileSystem fs = UTIL.getTestFileSystem();
137      // cleanup each of the files/directories registered
138      for (Path file : toCleanup) {
139        // remove the table and archive directories
140        CommonFSUtils.delete(fs, file, true);
141      }
142    } catch (IOException e) {
143      LOG.warn("Failure to delete archive directory", e);
144    } finally {
145      toCleanup.clear();
146    }
147    // make sure that backups are off for all tables
148    archivingClient.disableHFileBackup();
149  }
150
151  @AfterClass
152  public static void cleanupTest() throws Exception {
153    if (CONNECTION != null) {
154      CONNECTION.close();
155    }
156    UTIL.shutdownMiniZKCluster();
157    if (POOL != null) {
158      POOL.shutdownNow();
159    }
160  }
161
162  /**
163   * Test turning on/off archiving
164   */
165  @Test
166  public void testArchivingEnableDisable() throws Exception {
167    // 1. turn on hfile backups
168    LOG.debug("----Starting archiving");
169    archivingClient.enableHFileBackupAsync(TABLE_NAME);
170    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME));
171
172    // 2. Turn off archiving and make sure its off
173    archivingClient.disableHFileBackup();
174    assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME));
175
176    // 3. Check enable/disable on a single table
177    archivingClient.enableHFileBackupAsync(TABLE_NAME);
178    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME));
179
180    // 4. Turn off archiving and make sure its off
181    archivingClient.disableHFileBackup(TABLE_NAME);
182    assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME,
183      archivingClient.getArchivingEnabled(TABLE_NAME));
184  }
185
186  @Test
187  public void testArchivingOnSingleTable() throws Exception {
188    createArchiveDirectory();
189    FileSystem fs = UTIL.getTestFileSystem();
190    Path archiveDir = getArchiveDir();
191    Path tableDir = getTableDir(STRING_TABLE_NAME);
192    toCleanup.add(archiveDir);
193    toCleanup.add(tableDir);
194
195    Configuration conf = UTIL.getConfiguration();
196    // setup the delegate
197    Stoppable stop = new StoppableImplementation();
198    HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
199    List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
200    final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
201
202    // create the region
203    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
204    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
205    List<HRegion> regions = new ArrayList<>();
206    regions.add(region);
207    Mockito.doReturn(regions).when(rss).getRegions();
208    final CompactedHFilesDischarger compactionCleaner =
209      new CompactedHFilesDischarger(100, stop, rss, false);
210    loadFlushAndCompact(region, TEST_FAM);
211    compactionCleaner.chore();
212    // get the current hfiles in the archive directory
213    List<Path> files = getAllFiles(fs, archiveDir);
214    if (files == null) {
215      CommonFSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG);
216      throw new RuntimeException("Didn't archive any files!");
217    }
218    CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size());
219
220    runCleaner(cleaner, finished, stop);
221
222    // know the cleaner ran, so now check all the files again to make sure they are still there
223    List<Path> archivedFiles = getAllFiles(fs, archiveDir);
224    assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles);
225
226    // but we still have the archive directory
227    assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
228  }
229
230  /**
231   * Test archiving/cleaning across multiple tables, where some are retained, and others aren't
232   * @throws Exception on failure
233   */
234  @Test
235  public void testMultipleTables() throws Exception {
236    createArchiveDirectory();
237    String otherTable = "otherTable";
238
239    FileSystem fs = UTIL.getTestFileSystem();
240    Path archiveDir = getArchiveDir();
241    Path tableDir = getTableDir(STRING_TABLE_NAME);
242    Path otherTableDir = getTableDir(otherTable);
243
244    // register cleanup for the created directories
245    toCleanup.add(archiveDir);
246    toCleanup.add(tableDir);
247    toCleanup.add(otherTableDir);
248    Configuration conf = UTIL.getConfiguration();
249    // setup the delegate
250    Stoppable stop = new StoppableImplementation();
251    final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
252    HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
253    List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
254    final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
255    // create the region
256    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
257    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
258    List<HRegion> regions = new ArrayList<>();
259    regions.add(region);
260    Mockito.doReturn(regions).when(rss).getRegions();
261    final CompactedHFilesDischarger compactionCleaner =
262      new CompactedHFilesDischarger(100, stop, rss, false);
263    loadFlushAndCompact(region, TEST_FAM);
264    compactionCleaner.chore();
265    // create the another table that we don't archive
266    hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
267    HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
268    regions = new ArrayList<>();
269    regions.add(otherRegion);
270    Mockito.doReturn(regions).when(rss).getRegions();
271    final CompactedHFilesDischarger compactionCleaner1 =
272      new CompactedHFilesDischarger(100, stop, rss, false);
273    loadFlushAndCompact(otherRegion, TEST_FAM);
274    compactionCleaner1.chore();
275    // get the current hfiles in the archive directory
276    // Should be archived
277    List<Path> files = getAllFiles(fs, archiveDir);
278    if (files == null) {
279      CommonFSUtils.logFileSystemState(fs, archiveDir, LOG);
280      throw new RuntimeException("Didn't load archive any files!");
281    }
282
283    // make sure we have files from both tables
284    int initialCountForPrimary = 0;
285    int initialCountForOtherTable = 0;
286    for (Path file : files) {
287      String tableName = file.getParent().getParent().getParent().getName();
288      // check to which table this file belongs
289      if (tableName.equals(otherTable)) {
290        initialCountForOtherTable++;
291      } else if (tableName.equals(STRING_TABLE_NAME)) {
292        initialCountForPrimary++;
293      }
294    }
295
296    assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0);
297    assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0);
298
299    // run the cleaners, checking for each of the directories + files (both should be deleted and
300    // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table'
301    CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3);
302    // run the cleaner
303    choreService.scheduleChore(cleaner);
304    // wait for the cleaner to check all the files
305    finished.await();
306    // stop the cleaner
307    stop.stop("");
308
309    // know the cleaner ran, so now check all the files again to make sure they are still there
310    List<Path> archivedFiles = getAllFiles(fs, archiveDir);
311    int archivedForPrimary = 0;
312    for (Path file : archivedFiles) {
313      String tableName = file.getParent().getParent().getParent().getName();
314      // ensure we don't have files from the non-archived table
315      assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable));
316      if (tableName.equals(STRING_TABLE_NAME)) {
317        archivedForPrimary++;
318      }
319    }
320
321    assertEquals("Not all archived files for the primary table were retained.",
322      initialCountForPrimary, archivedForPrimary);
323
324    // but we still have the archive directory
325    assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir));
326  }
327
328  private void createArchiveDirectory() throws IOException {
329    // create the archive and test directory
330    FileSystem fs = UTIL.getTestFileSystem();
331    Path archiveDir = getArchiveDir();
332    fs.mkdirs(archiveDir);
333  }
334
335  private Path getArchiveDir() throws IOException {
336    return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY);
337  }
338
339  private Path getTableDir(String tableName) throws IOException {
340    Path testDataDir = UTIL.getDataTestDir();
341    CommonFSUtils.setRootDir(UTIL.getConfiguration(), testDataDir);
342    return new Path(testDataDir, tableName);
343  }
344
345  private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir,
346    Stoppable stop) {
347    conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
348      LongTermArchivingHFileCleaner.class.getCanonicalName());
349    return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL);
350  }
351
352  /**
353   * Start archiving table for given hfile cleaner
354   * @param tableName table to archive
355   * @param cleaner   cleaner to check to make sure change propagated
356   * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving
357   * @throws IOException     on failure
358   * @throws KeeperException on failure
359   */
360  @SuppressWarnings("checkstyle:EmptyBlock")
361  private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner)
362    throws IOException, KeeperException {
363    // turn on hfile retention
364    LOG.debug("----Starting archiving for table:" + tableName);
365    archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName));
366    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName));
367
368    // wait for the archiver to get the notification
369    List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting();
370    LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
371    while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) {
372      // spin until propagation - should be fast
373    }
374    return cleaners;
375  }
376
377  /**
378   * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has
379   * seen all the files
380   * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at
381   *         least the expected number of times.
382   */
383  private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner,
384    List<BaseHFileCleanerDelegate> cleaners, final int expected) {
385    // replace the cleaner with one that we can can check
386    BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner);
387    final int[] counter = new int[] { 0 };
388    final CountDownLatch finished = new CountDownLatch(1);
389    Mockito.doAnswer(new Answer<Iterable<FileStatus>>() {
390
391      @Override
392      public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable {
393        counter[0]++;
394        LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: "
395          + invocation.getArgument(0));
396
397        @SuppressWarnings("unchecked")
398        Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod();
399        if (counter[0] >= expected) {
400          finished.countDown();
401        }
402
403        return ret;
404      }
405    }).when(delegateSpy).getDeletableFiles(Mockito.anyList());
406    cleaners.set(0, delegateSpy);
407
408    return finished;
409  }
410
411  /**
412   * Get all the files (non-directory entries) in the file system under the passed directory
413   * @param dir directory to investigate
414   * @return all files under the directory
415   */
416  private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException {
417    FileStatus[] files = CommonFSUtils.listStatus(fs, dir, null);
418    if (files == null) {
419      LOG.warn("No files under:" + dir);
420      return null;
421    }
422
423    List<Path> allFiles = new ArrayList<>();
424    for (FileStatus file : files) {
425      if (file.isDirectory()) {
426        List<Path> subFiles = getAllFiles(fs, file.getPath());
427
428        if (subFiles != null) {
429          allFiles.addAll(subFiles);
430        }
431
432        continue;
433      }
434      allFiles.add(file.getPath());
435    }
436    return allFiles;
437  }
438
439  private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException {
440    // create two hfiles in the region
441    createHFileInRegion(region, family);
442    createHFileInRegion(region, family);
443
444    HStore s = region.getStore(family);
445    int count = s.getStorefilesCount();
446    assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count,
447      count >= 2);
448
449    // compact the two files into one file to get files in the archive
450    LOG.debug("Compacting stores");
451    region.compact(true);
452  }
453
454  /**
455   * Create a new hfile in the passed region
456   * @param region       region to operate on
457   * @param columnFamily family for which to add data
458   * @throws IOException if doing the put or flush fails
459   */
460  private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException {
461    // put one row in the region
462    Put p = new Put(Bytes.toBytes("row"));
463    p.addColumn(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1"));
464    region.put(p);
465    // flush the region to make a store file
466    region.flush(true);
467  }
468
469  /**
470   * @param cleaner the cleaner to use
471   */
472  private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop)
473    throws InterruptedException {
474    final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME");
475    // run the cleaner
476    choreService.scheduleChore(cleaner);
477    // wait for the cleaner to check all the files
478    finished.await();
479    // stop the cleaner
480    stop.stop("");
481  }
482}