001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.example;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.mock;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.concurrent.CountDownLatch;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.ChoreService;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.Stoppable;
038import org.apache.hadoop.hbase.client.ClusterConnection;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
044import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
045import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
046import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.regionserver.HStore;
049import org.apache.hadoop.hbase.regionserver.RegionServerServices;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.testclassification.MiscTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.HFileArchiveUtil;
055import org.apache.hadoop.hbase.util.StoppableImplementation;
056import org.apache.hadoop.hbase.zookeeper.ZKUtil;
057import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
058import org.apache.zookeeper.KeeperException;
059import org.junit.After;
060import org.junit.AfterClass;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.mockito.Mockito;
066import org.mockito.invocation.InvocationOnMock;
067import org.mockito.stubbing.Answer;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071/**
072 * Spin up a small cluster and check that the hfiles of region are properly long-term archived as
073 * specified via the {@link ZKTableArchiveClient}.
074 */
075@Category({ MiscTests.class, MediumTests.class })
076public class TestZooKeeperTableArchiveClient {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestZooKeeperTableArchiveClient.class);
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperTableArchiveClient.class);
083  private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
084  private static final String STRING_TABLE_NAME = "test";
085  private static final byte[] TEST_FAM = Bytes.toBytes("fam");
086  private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
087  private static ZKTableArchiveClient archivingClient;
088  private final List<Path> toCleanup = new ArrayList<>();
089  private static ClusterConnection CONNECTION;
090  private static RegionServerServices rss;
091  private static DirScanPool POOL;
092
093  /**
094   * Setup the config for the cluster
095   */
096  @BeforeClass
097  public static void setupCluster() throws Exception {
098    setupConf(UTIL.getConfiguration());
099    UTIL.startMiniZKCluster();
100    CONNECTION = (ClusterConnection) ConnectionFactory.createConnection(UTIL.getConfiguration());
101    archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION);
102    // make hfile archiving node so we can archive files
103    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
104    String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
105    ZKUtil.createWithParents(watcher, archivingZNode);
106    rss = mock(RegionServerServices.class);
107    POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
108  }
109
110  private static void setupConf(Configuration conf) {
111    // only compact with 3 files
112    conf.setInt("hbase.hstore.compaction.min", 3);
113  }
114
115  @After
116  public void tearDown() throws Exception {
117    try {
118      FileSystem fs = UTIL.getTestFileSystem();
119      // cleanup each of the files/directories registered
120      for (Path file : toCleanup) {
121        // remove the table and archive directories
122        CommonFSUtils.delete(fs, file, true);
123      }
124    } catch (IOException e) {
125      LOG.warn("Failure to delete archive directory", e);
126    } finally {
127      toCleanup.clear();
128    }
129    // make sure that backups are off for all tables
130    archivingClient.disableHFileBackup();
131  }
132
133  @AfterClass
134  public static void cleanupTest() throws Exception {
135    if (CONNECTION != null) {
136      CONNECTION.close();
137    }
138    UTIL.shutdownMiniZKCluster();
139    if (POOL != null) {
140      POOL.shutdownNow();
141    }
142  }
143
144  /**
145   * Test turning on/off archiving
146   */
147  @Test
148  public void testArchivingEnableDisable() throws Exception {
149    // 1. turn on hfile backups
150    LOG.debug("----Starting archiving");
151    archivingClient.enableHFileBackupAsync(TABLE_NAME);
152    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME));
153
154    // 2. Turn off archiving and make sure its off
155    archivingClient.disableHFileBackup();
156    assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME));
157
158    // 3. Check enable/disable on a single table
159    archivingClient.enableHFileBackupAsync(TABLE_NAME);
160    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME));
161
162    // 4. Turn off archiving and make sure its off
163    archivingClient.disableHFileBackup(TABLE_NAME);
164    assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME,
165      archivingClient.getArchivingEnabled(TABLE_NAME));
166  }
167
168  @Test
169  public void testArchivingOnSingleTable() throws Exception {
170    createArchiveDirectory();
171    FileSystem fs = UTIL.getTestFileSystem();
172    Path archiveDir = getArchiveDir();
173    Path tableDir = getTableDir(STRING_TABLE_NAME);
174    toCleanup.add(archiveDir);
175    toCleanup.add(tableDir);
176
177    Configuration conf = UTIL.getConfiguration();
178    // setup the delegate
179    Stoppable stop = new StoppableImplementation();
180    HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
181    List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
182    final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
183
184    // create the region
185    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
186    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
187    List<HRegion> regions = new ArrayList<>();
188    regions.add(region);
189    Mockito.doReturn(regions).when(rss).getRegions();
190    final CompactedHFilesDischarger compactionCleaner =
191      new CompactedHFilesDischarger(100, stop, rss, false);
192    loadFlushAndCompact(region, TEST_FAM);
193    compactionCleaner.chore();
194    // get the current hfiles in the archive directory
195    List<Path> files = getAllFiles(fs, archiveDir);
196    if (files == null) {
197      CommonFSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG);
198      throw new RuntimeException("Didn't archive any files!");
199    }
200    CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size());
201
202    runCleaner(cleaner, finished, stop);
203
204    // know the cleaner ran, so now check all the files again to make sure they are still there
205    List<Path> archivedFiles = getAllFiles(fs, archiveDir);
206    assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles);
207
208    // but we still have the archive directory
209    assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
210  }
211
212  /**
213   * Test archiving/cleaning across multiple tables, where some are retained, and others aren't
214   * @throws Exception on failure
215   */
216  @Test
217  public void testMultipleTables() throws Exception {
218    createArchiveDirectory();
219    String otherTable = "otherTable";
220
221    FileSystem fs = UTIL.getTestFileSystem();
222    Path archiveDir = getArchiveDir();
223    Path tableDir = getTableDir(STRING_TABLE_NAME);
224    Path otherTableDir = getTableDir(otherTable);
225
226    // register cleanup for the created directories
227    toCleanup.add(archiveDir);
228    toCleanup.add(tableDir);
229    toCleanup.add(otherTableDir);
230    Configuration conf = UTIL.getConfiguration();
231    // setup the delegate
232    Stoppable stop = new StoppableImplementation();
233    final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
234    HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
235    List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
236    final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
237    // create the region
238    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
239    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
240    List<HRegion> regions = new ArrayList<>();
241    regions.add(region);
242    Mockito.doReturn(regions).when(rss).getRegions();
243    final CompactedHFilesDischarger compactionCleaner =
244      new CompactedHFilesDischarger(100, stop, rss, false);
245    loadFlushAndCompact(region, TEST_FAM);
246    compactionCleaner.chore();
247    // create the another table that we don't archive
248    hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
249    HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
250    regions = new ArrayList<>();
251    regions.add(otherRegion);
252    Mockito.doReturn(regions).when(rss).getRegions();
253    final CompactedHFilesDischarger compactionCleaner1 =
254      new CompactedHFilesDischarger(100, stop, rss, false);
255    loadFlushAndCompact(otherRegion, TEST_FAM);
256    compactionCleaner1.chore();
257    // get the current hfiles in the archive directory
258    // Should be archived
259    List<Path> files = getAllFiles(fs, archiveDir);
260    if (files == null) {
261      CommonFSUtils.logFileSystemState(fs, archiveDir, LOG);
262      throw new RuntimeException("Didn't load archive any files!");
263    }
264
265    // make sure we have files from both tables
266    int initialCountForPrimary = 0;
267    int initialCountForOtherTable = 0;
268    for (Path file : files) {
269      String tableName = file.getParent().getParent().getParent().getName();
270      // check to which table this file belongs
271      if (tableName.equals(otherTable)) {
272        initialCountForOtherTable++;
273      } else if (tableName.equals(STRING_TABLE_NAME)) {
274        initialCountForPrimary++;
275      }
276    }
277
278    assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0);
279    assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0);
280
281    // run the cleaners, checking for each of the directories + files (both should be deleted and
282    // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table'
283    CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3);
284    // run the cleaner
285    choreService.scheduleChore(cleaner);
286    // wait for the cleaner to check all the files
287    finished.await();
288    // stop the cleaner
289    stop.stop("");
290
291    // know the cleaner ran, so now check all the files again to make sure they are still there
292    List<Path> archivedFiles = getAllFiles(fs, archiveDir);
293    int archivedForPrimary = 0;
294    for (Path file : archivedFiles) {
295      String tableName = file.getParent().getParent().getParent().getName();
296      // ensure we don't have files from the non-archived table
297      assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable));
298      if (tableName.equals(STRING_TABLE_NAME)) {
299        archivedForPrimary++;
300      }
301    }
302
303    assertEquals("Not all archived files for the primary table were retained.",
304      initialCountForPrimary, archivedForPrimary);
305
306    // but we still have the archive directory
307    assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir));
308  }
309
310  private void createArchiveDirectory() throws IOException {
311    // create the archive and test directory
312    FileSystem fs = UTIL.getTestFileSystem();
313    Path archiveDir = getArchiveDir();
314    fs.mkdirs(archiveDir);
315  }
316
317  private Path getArchiveDir() throws IOException {
318    return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY);
319  }
320
321  private Path getTableDir(String tableName) throws IOException {
322    Path testDataDir = UTIL.getDataTestDir();
323    CommonFSUtils.setRootDir(UTIL.getConfiguration(), testDataDir);
324    return new Path(testDataDir, tableName);
325  }
326
327  private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir,
328    Stoppable stop) {
329    conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
330      LongTermArchivingHFileCleaner.class.getCanonicalName());
331    return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL);
332  }
333
334  /**
335   * Start archiving table for given hfile cleaner
336   * @param tableName table to archive
337   * @param cleaner   cleaner to check to make sure change propagated
338   * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving
339   * @throws IOException     on failure
340   * @throws KeeperException on failure
341   */
342  @SuppressWarnings("checkstyle:EmptyBlock")
343  private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner)
344    throws IOException, KeeperException {
345    // turn on hfile retention
346    LOG.debug("----Starting archiving for table:" + tableName);
347    archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName));
348    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName));
349
350    // wait for the archiver to get the notification
351    List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting();
352    LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
353    while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) {
354      // spin until propagation - should be fast
355    }
356    return cleaners;
357  }
358
359  /**
360   * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has
361   * seen all the files
362   * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at
363   *         least the expected number of times.
364   */
365  private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner,
366    List<BaseHFileCleanerDelegate> cleaners, final int expected) {
367    // replace the cleaner with one that we can can check
368    BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner);
369    final int[] counter = new int[] { 0 };
370    final CountDownLatch finished = new CountDownLatch(1);
371    Mockito.doAnswer(new Answer<Iterable<FileStatus>>() {
372
373      @Override
374      public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable {
375        counter[0]++;
376        LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: "
377          + invocation.getArgument(0));
378
379        @SuppressWarnings("unchecked")
380        Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod();
381        if (counter[0] >= expected) {
382          finished.countDown();
383        }
384
385        return ret;
386      }
387    }).when(delegateSpy).getDeletableFiles(Mockito.anyListOf(FileStatus.class));
388    cleaners.set(0, delegateSpy);
389
390    return finished;
391  }
392
393  /**
394   * Get all the files (non-directory entries) in the file system under the passed directory
395   * @param dir directory to investigate
396   * @return all files under the directory
397   */
398  private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException {
399    FileStatus[] files = CommonFSUtils.listStatus(fs, dir, null);
400    if (files == null) {
401      LOG.warn("No files under:" + dir);
402      return null;
403    }
404
405    List<Path> allFiles = new ArrayList<>();
406    for (FileStatus file : files) {
407      if (file.isDirectory()) {
408        List<Path> subFiles = getAllFiles(fs, file.getPath());
409
410        if (subFiles != null) {
411          allFiles.addAll(subFiles);
412        }
413
414        continue;
415      }
416      allFiles.add(file.getPath());
417    }
418    return allFiles;
419  }
420
421  private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException {
422    // create two hfiles in the region
423    createHFileInRegion(region, family);
424    createHFileInRegion(region, family);
425
426    HStore s = region.getStore(family);
427    int count = s.getStorefilesCount();
428    assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count,
429      count >= 2);
430
431    // compact the two files into one file to get files in the archive
432    LOG.debug("Compacting stores");
433    region.compact(true);
434  }
435
436  /**
437   * Create a new hfile in the passed region
438   * @param region       region to operate on
439   * @param columnFamily family for which to add data
440   * @throws IOException if doing the put or flush fails
441   */
442  private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException {
443    // put one row in the region
444    Put p = new Put(Bytes.toBytes("row"));
445    p.addColumn(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1"));
446    region.put(p);
447    // flush the region to make a store file
448    region.flush(true);
449  }
450
451  /**
452   * @param cleaner the cleaner to use
453   */
454  private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop)
455    throws InterruptedException {
456    final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME");
457    // run the cleaner
458    choreService.scheduleChore(cleaner);
459    // wait for the cleaner to check all the files
460    finished.await();
461    // stop the cleaner
462    stop.stop("");
463  }
464}