001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.example;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.mock;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.CountDownLatch;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.ChoreService;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.Stoppable;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.DummyConnectionRegistry;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
046import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
047import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
048import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
049import org.apache.hadoop.hbase.regionserver.HRegion;
050import org.apache.hadoop.hbase.regionserver.HStore;
051import org.apache.hadoop.hbase.regionserver.RegionServerServices;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.testclassification.MiscTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.CommonFSUtils;
056import org.apache.hadoop.hbase.util.HFileArchiveUtil;
057import org.apache.hadoop.hbase.util.StoppableImplementation;
058import org.apache.hadoop.hbase.zookeeper.ZKUtil;
059import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
060import org.apache.zookeeper.KeeperException;
061import org.junit.After;
062import org.junit.AfterClass;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.mockito.Mockito;
068import org.mockito.invocation.InvocationOnMock;
069import org.mockito.stubbing.Answer;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073/**
074 * Spin up a small cluster and check that the hfiles of region are properly long-term archived as
075 * specified via the {@link ZKTableArchiveClient}.
076 */
077@Category({ MiscTests.class, MediumTests.class })
078public class TestZooKeeperTableArchiveClient {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082    HBaseClassTestRule.forClass(TestZooKeeperTableArchiveClient.class);
083
084  private static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperTableArchiveClient.class);
085  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
086  private static final String STRING_TABLE_NAME = "test";
087  private static final byte[] TEST_FAM = Bytes.toBytes("fam");
088  private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
089  private static ZKTableArchiveClient archivingClient;
090  private final List<Path> toCleanup = new ArrayList<>();
091  private static Connection CONNECTION;
092  private static RegionServerServices rss;
093  private static DirScanPool POOL;
094
095  public static final class MockRegistry extends DummyConnectionRegistry {
096
097    public MockRegistry(Configuration conf) {
098    }
099
100    @Override
101    public CompletableFuture<String> getClusterId() {
102      return CompletableFuture.completedFuture("clusterId");
103    }
104  }
105
106  /**
107   * Setup the config for the cluster
108   */
109  @BeforeClass
110  public static void setupCluster() throws Exception {
111    setupConf(UTIL.getConfiguration());
112    UTIL.startMiniZKCluster();
113    UTIL.getConfiguration().setClass(MockRegistry.REGISTRY_IMPL_CONF_KEY, MockRegistry.class,
114      DummyConnectionRegistry.class);
115    CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
116    archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION);
117    // make hfile archiving node so we can archive files
118    ZKWatcher watcher = UTIL.getZooKeeperWatcher();
119    String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
120    ZKUtil.createWithParents(watcher, archivingZNode);
121    rss = mock(RegionServerServices.class);
122    POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
123  }
124
125  private static void setupConf(Configuration conf) {
126    // only compact with 3 files
127    conf.setInt("hbase.hstore.compaction.min", 3);
128  }
129
130  @After
131  public void tearDown() throws Exception {
132    try {
133      FileSystem fs = UTIL.getTestFileSystem();
134      // cleanup each of the files/directories registered
135      for (Path file : toCleanup) {
136        // remove the table and archive directories
137        CommonFSUtils.delete(fs, file, true);
138      }
139    } catch (IOException e) {
140      LOG.warn("Failure to delete archive directory", e);
141    } finally {
142      toCleanup.clear();
143    }
144    // make sure that backups are off for all tables
145    archivingClient.disableHFileBackup();
146  }
147
148  @AfterClass
149  public static void cleanupTest() throws Exception {
150    if (CONNECTION != null) {
151      CONNECTION.close();
152    }
153    UTIL.shutdownMiniZKCluster();
154    if (POOL != null) {
155      POOL.shutdownNow();
156    }
157  }
158
159  /**
160   * Test turning on/off archiving
161   */
162  @Test
163  public void testArchivingEnableDisable() throws Exception {
164    // 1. turn on hfile backups
165    LOG.debug("----Starting archiving");
166    archivingClient.enableHFileBackupAsync(TABLE_NAME);
167    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME));
168
169    // 2. Turn off archiving and make sure its off
170    archivingClient.disableHFileBackup();
171    assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME));
172
173    // 3. Check enable/disable on a single table
174    archivingClient.enableHFileBackupAsync(TABLE_NAME);
175    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(TABLE_NAME));
176
177    // 4. Turn off archiving and make sure its off
178    archivingClient.disableHFileBackup(TABLE_NAME);
179    assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME,
180      archivingClient.getArchivingEnabled(TABLE_NAME));
181  }
182
183  @Test
184  public void testArchivingOnSingleTable() throws Exception {
185    createArchiveDirectory();
186    FileSystem fs = UTIL.getTestFileSystem();
187    Path archiveDir = getArchiveDir();
188    Path tableDir = getTableDir(STRING_TABLE_NAME);
189    toCleanup.add(archiveDir);
190    toCleanup.add(tableDir);
191
192    Configuration conf = UTIL.getConfiguration();
193    // setup the delegate
194    Stoppable stop = new StoppableImplementation();
195    HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
196    List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
197    final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
198
199    // create the region
200    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
201    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
202    List<HRegion> regions = new ArrayList<>();
203    regions.add(region);
204    Mockito.doReturn(regions).when(rss).getRegions();
205    final CompactedHFilesDischarger compactionCleaner =
206      new CompactedHFilesDischarger(100, stop, rss, false);
207    loadFlushAndCompact(region, TEST_FAM);
208    compactionCleaner.chore();
209    // get the current hfiles in the archive directory
210    List<Path> files = getAllFiles(fs, archiveDir);
211    if (files == null) {
212      CommonFSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG);
213      throw new RuntimeException("Didn't archive any files!");
214    }
215    CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size());
216
217    runCleaner(cleaner, finished, stop);
218
219    // know the cleaner ran, so now check all the files again to make sure they are still there
220    List<Path> archivedFiles = getAllFiles(fs, archiveDir);
221    assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles);
222
223    // but we still have the archive directory
224    assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
225  }
226
227  /**
228   * Test archiving/cleaning across multiple tables, where some are retained, and others aren't
229   * @throws Exception on failure
230   */
231  @Test
232  public void testMultipleTables() throws Exception {
233    createArchiveDirectory();
234    String otherTable = "otherTable";
235
236    FileSystem fs = UTIL.getTestFileSystem();
237    Path archiveDir = getArchiveDir();
238    Path tableDir = getTableDir(STRING_TABLE_NAME);
239    Path otherTableDir = getTableDir(otherTable);
240
241    // register cleanup for the created directories
242    toCleanup.add(archiveDir);
243    toCleanup.add(tableDir);
244    toCleanup.add(otherTableDir);
245    Configuration conf = UTIL.getConfiguration();
246    // setup the delegate
247    Stoppable stop = new StoppableImplementation();
248    final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
249    HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
250    List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
251    final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
252    // create the region
253    ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
254    HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
255    List<HRegion> regions = new ArrayList<>();
256    regions.add(region);
257    Mockito.doReturn(regions).when(rss).getRegions();
258    final CompactedHFilesDischarger compactionCleaner =
259      new CompactedHFilesDischarger(100, stop, rss, false);
260    loadFlushAndCompact(region, TEST_FAM);
261    compactionCleaner.chore();
262    // create the another table that we don't archive
263    hcd = ColumnFamilyDescriptorBuilder.of(TEST_FAM);
264    HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
265    regions = new ArrayList<>();
266    regions.add(otherRegion);
267    Mockito.doReturn(regions).when(rss).getRegions();
268    final CompactedHFilesDischarger compactionCleaner1 =
269      new CompactedHFilesDischarger(100, stop, rss, false);
270    loadFlushAndCompact(otherRegion, TEST_FAM);
271    compactionCleaner1.chore();
272    // get the current hfiles in the archive directory
273    // Should be archived
274    List<Path> files = getAllFiles(fs, archiveDir);
275    if (files == null) {
276      CommonFSUtils.logFileSystemState(fs, archiveDir, LOG);
277      throw new RuntimeException("Didn't load archive any files!");
278    }
279
280    // make sure we have files from both tables
281    int initialCountForPrimary = 0;
282    int initialCountForOtherTable = 0;
283    for (Path file : files) {
284      String tableName = file.getParent().getParent().getParent().getName();
285      // check to which table this file belongs
286      if (tableName.equals(otherTable)) {
287        initialCountForOtherTable++;
288      } else if (tableName.equals(STRING_TABLE_NAME)) {
289        initialCountForPrimary++;
290      }
291    }
292
293    assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0);
294    assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0);
295
296    // run the cleaners, checking for each of the directories + files (both should be deleted and
297    // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table'
298    CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3);
299    // run the cleaner
300    choreService.scheduleChore(cleaner);
301    // wait for the cleaner to check all the files
302    finished.await();
303    // stop the cleaner
304    stop.stop("");
305
306    // know the cleaner ran, so now check all the files again to make sure they are still there
307    List<Path> archivedFiles = getAllFiles(fs, archiveDir);
308    int archivedForPrimary = 0;
309    for (Path file : archivedFiles) {
310      String tableName = file.getParent().getParent().getParent().getName();
311      // ensure we don't have files from the non-archived table
312      assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable));
313      if (tableName.equals(STRING_TABLE_NAME)) {
314        archivedForPrimary++;
315      }
316    }
317
318    assertEquals("Not all archived files for the primary table were retained.",
319      initialCountForPrimary, archivedForPrimary);
320
321    // but we still have the archive directory
322    assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir));
323  }
324
325  private void createArchiveDirectory() throws IOException {
326    // create the archive and test directory
327    FileSystem fs = UTIL.getTestFileSystem();
328    Path archiveDir = getArchiveDir();
329    fs.mkdirs(archiveDir);
330  }
331
332  private Path getArchiveDir() throws IOException {
333    return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY);
334  }
335
336  private Path getTableDir(String tableName) throws IOException {
337    Path testDataDir = UTIL.getDataTestDir();
338    CommonFSUtils.setRootDir(UTIL.getConfiguration(), testDataDir);
339    return new Path(testDataDir, tableName);
340  }
341
342  private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir,
343    Stoppable stop) {
344    conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
345      LongTermArchivingHFileCleaner.class.getCanonicalName());
346    return new HFileCleaner(1000, stop, conf, fs, archiveDir, POOL);
347  }
348
349  /**
350   * Start archiving table for given hfile cleaner
351   * @param tableName table to archive
352   * @param cleaner   cleaner to check to make sure change propagated
353   * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving
354   * @throws IOException     on failure
355   * @throws KeeperException on failure
356   */
357  @SuppressWarnings("checkstyle:EmptyBlock")
358  private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner)
359    throws IOException, KeeperException {
360    // turn on hfile retention
361    LOG.debug("----Starting archiving for table:" + tableName);
362    archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName));
363    assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName));
364
365    // wait for the archiver to get the notification
366    List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting();
367    LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
368    while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) {
369      // spin until propagation - should be fast
370    }
371    return cleaners;
372  }
373
374  /**
375   * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has
376   * seen all the files
377   * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at
378   *         least the expected number of times.
379   */
380  private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner,
381    List<BaseHFileCleanerDelegate> cleaners, final int expected) {
382    // replace the cleaner with one that we can can check
383    BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner);
384    final int[] counter = new int[] { 0 };
385    final CountDownLatch finished = new CountDownLatch(1);
386    Mockito.doAnswer(new Answer<Iterable<FileStatus>>() {
387
388      @Override
389      public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable {
390        counter[0]++;
391        LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: "
392          + invocation.getArgument(0));
393
394        @SuppressWarnings("unchecked")
395        Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod();
396        if (counter[0] >= expected) {
397          finished.countDown();
398        }
399
400        return ret;
401      }
402    }).when(delegateSpy).getDeletableFiles(Mockito.anyList());
403    cleaners.set(0, delegateSpy);
404
405    return finished;
406  }
407
408  /**
409   * Get all the files (non-directory entries) in the file system under the passed directory
410   * @param dir directory to investigate
411   * @return all files under the directory
412   */
413  private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException {
414    FileStatus[] files = CommonFSUtils.listStatus(fs, dir, null);
415    if (files == null) {
416      LOG.warn("No files under:" + dir);
417      return null;
418    }
419
420    List<Path> allFiles = new ArrayList<>();
421    for (FileStatus file : files) {
422      if (file.isDirectory()) {
423        List<Path> subFiles = getAllFiles(fs, file.getPath());
424
425        if (subFiles != null) {
426          allFiles.addAll(subFiles);
427        }
428
429        continue;
430      }
431      allFiles.add(file.getPath());
432    }
433    return allFiles;
434  }
435
436  private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException {
437    // create two hfiles in the region
438    createHFileInRegion(region, family);
439    createHFileInRegion(region, family);
440
441    HStore s = region.getStore(family);
442    int count = s.getStorefilesCount();
443    assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count,
444      count >= 2);
445
446    // compact the two files into one file to get files in the archive
447    LOG.debug("Compacting stores");
448    region.compact(true);
449  }
450
451  /**
452   * Create a new hfile in the passed region
453   * @param region       region to operate on
454   * @param columnFamily family for which to add data
455   * @throws IOException if doing the put or flush fails
456   */
457  private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException {
458    // put one row in the region
459    Put p = new Put(Bytes.toBytes("row"));
460    p.addColumn(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1"));
461    region.put(p);
462    // flush the region to make a store file
463    region.flush(true);
464  }
465
466  /**
467   * @param cleaner the cleaner to use
468   */
469  private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop)
470    throws InterruptedException {
471    final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME");
472    // run the cleaner
473    choreService.scheduleChore(cleaner);
474    // wait for the cleaner to check all the files
475    finished.await();
476    // stop the cleaner
477    stop.stop("");
478  }
479}