001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.doThrow;
024import static org.mockito.Mockito.spy;
025
026import java.io.IOException;
027import java.net.URLEncoder;
028import java.nio.charset.StandardCharsets;
029import java.util.Arrays;
030import java.util.Iterator;
031import java.util.List;
032import java.util.concurrent.ThreadLocalRandom;
033import org.apache.commons.io.FileUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Abortable;
040import org.apache.hadoop.hbase.ChoreService;
041import org.apache.hadoop.hbase.CoordinatedStateManager;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.Waiter;
048import org.apache.hadoop.hbase.ZooKeeperConnectionException;
049import org.apache.hadoop.hbase.client.ClusterConnection;
050import org.apache.hadoop.hbase.client.Connection;
051import org.apache.hadoop.hbase.master.HMaster;
052import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
053import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
054import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
055import org.apache.hadoop.hbase.testclassification.MasterTests;
056import org.apache.hadoop.hbase.testclassification.MediumTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
060import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
061import org.apache.zookeeper.KeeperException;
062import org.junit.AfterClass;
063import org.junit.Before;
064import org.junit.BeforeClass;
065import org.junit.ClassRule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071@Category({ MasterTests.class, MediumTests.class })
072public class TestLogsCleaner {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestLogsCleaner.class);
077
078  private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class);
079  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
080
081  private static final Path OLD_WALS_DIR =
082    new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
083
084  private static final Path OLD_PROCEDURE_WALS_DIR = new Path(OLD_WALS_DIR, "masterProcedureWALs");
085
086  private static Configuration conf;
087
088  private static DirScanPool POOL;
089
090  @BeforeClass
091  public static void setUpBeforeClass() throws Exception {
092    TEST_UTIL.startMiniZKCluster();
093    TEST_UTIL.startMiniDFSCluster(1);
094    POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
095  }
096
097  @AfterClass
098  public static void tearDownAfterClass() throws Exception {
099    TEST_UTIL.shutdownMiniZKCluster();
100    TEST_UTIL.shutdownMiniDFSCluster();
101    POOL.shutdownNow();
102  }
103
104  @Before
105  public void beforeTest() throws IOException {
106    conf = TEST_UTIL.getConfiguration();
107
108    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
109
110    fs.delete(OLD_WALS_DIR, true);
111
112    // root directory
113    fs.mkdirs(OLD_WALS_DIR);
114  }
115
116  /**
117   * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same
118   * oldWALs directory. Created files: - 2 invalid files - 5 old Procedure WALs - 30 old WALs from
119   * which 3 are in replication - 5 recent Procedure WALs - 1 recent WAL - 1 very new WAL (timestamp
120   * in future) - masterProcedureWALs subdirectory Files which should stay: - 3 replication WALs - 2
121   * new WALs - 5 latest Procedure WALs - masterProcedureWALs subdirectory
122   */
123  @Test
124  public void testLogCleaning() throws Exception {
125    // set TTLs
126    long ttlWAL = 2000;
127    long ttlProcedureWAL = 4000;
128    conf.setLong("hbase.master.logcleaner.ttl", ttlWAL);
129    conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL);
130
131    HMaster.decorateMasterConfiguration(conf);
132    Server server = new DummyServer();
133    ReplicationQueueStorage queueStorage =
134      ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
135
136    String fakeMachineName =
137      URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name());
138
139    final FileSystem fs = FileSystem.get(conf);
140    fs.mkdirs(OLD_PROCEDURE_WALS_DIR);
141
142    final long now = EnvironmentEdgeManager.currentTime();
143
144    // Case 1: 2 invalid files, which would be deleted directly
145    fs.createNewFile(new Path(OLD_WALS_DIR, "a"));
146    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a"));
147
148    // Case 2: 5 Procedure WALs that are old which would be deleted
149    for (int i = 1; i <= 5; i++) {
150      final Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
151      fs.createNewFile(fileName);
152    }
153
154    // Sleep for sometime to get old procedure WALs
155    Thread.sleep(ttlProcedureWAL - ttlWAL);
156
157    // Case 3: old WALs which would be deletable
158    for (int i = 1; i <= 30; i++) {
159      Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
160      fs.createNewFile(fileName);
161      // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
162      // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
163      if (i % (30 / 3) == 0) {
164        queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
165        LOG.info("Replication log file: " + fileName);
166      }
167    }
168
169    // Case 5: 5 Procedure WALs that are new, will stay
170    for (int i = 6; i <= 10; i++) {
171      Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
172      fs.createNewFile(fileName);
173    }
174
175    // Sleep for sometime to get newer modification time
176    Thread.sleep(ttlWAL);
177    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now));
178
179    // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner,
180    // so we are not going down the chain
181    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL)));
182
183    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
184    LOG.info("File status: {}", Arrays.toString(status));
185
186    // There should be 34 files and 1 masterProcedureWALs directory
187    assertEquals(35, fs.listStatus(OLD_WALS_DIR).length);
188    // 10 procedure WALs
189    assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
190
191    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null);
192    cleaner.chore();
193
194    // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
195    // are scheduled for replication and masterProcedureWALs directory
196    TEST_UTIL.waitFor(1000,
197      (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(OLD_WALS_DIR).length);
198    // In masterProcedureWALs we end up with 5 newer Procedure WALs
199    TEST_UTIL.waitFor(1000,
200      (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
201
202    if (LOG.isDebugEnabled()) {
203      FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR);
204      FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR);
205      LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs));
206      LOG.debug("Kept log file for masterProcedureWALs: {}", Arrays.toString(statusProcedureWALs));
207    }
208  }
209
210  /**
211   * ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting.
212   */
213  @Test
214  public void testZooKeeperAbort() throws Exception {
215    Configuration conf = TEST_UTIL.getConfiguration();
216    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
217
218    List<FileStatus> dummyFiles = Arrays.asList(
219      new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log1")),
220      new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log2")));
221
222    try (FaultyZooKeeperWatcher faultyZK =
223      new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null)) {
224      faultyZK.init();
225      cleaner.setConf(conf, faultyZK);
226      cleaner.preClean();
227      // should keep all files due to a ConnectionLossException getting the queues znodes
228      Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
229      assertFalse(toDelete.iterator().hasNext());
230      assertFalse(cleaner.isStopped());
231    }
232
233    // when zk is working both files should be returned
234    cleaner = new ReplicationLogCleaner();
235    try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null)) {
236      cleaner.setConf(conf, zkw);
237      cleaner.preClean();
238      Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
239      Iterator<FileStatus> iter = filesToDelete.iterator();
240      assertTrue(iter.hasNext());
241      assertEquals(new Path("log1"), iter.next().getPath());
242      assertTrue(iter.hasNext());
243      assertEquals(new Path("log2"), iter.next().getPath());
244      assertFalse(iter.hasNext());
245    }
246  }
247
248  /**
249   * When zk is working both files should be returned
250   * @throws Exception from ZK watcher
251   */
252  @Test
253  public void testZooKeeperNormal() throws Exception {
254    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
255
256    // Subtract 1000 from current time so modtime is for sure older
257    // than 'now'.
258    long modTime = EnvironmentEdgeManager.currentTime() - 1000;
259    List<FileStatus> dummyFiles =
260      Arrays.asList(new FileStatus(100, false, 3, 100, modTime, new Path("log1")),
261        new FileStatus(100, false, 3, 100, modTime, new Path("log2")));
262
263    ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null);
264    try {
265      cleaner.setConf(conf, zkw);
266      cleaner.preClean();
267      Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
268      Iterator<FileStatus> iter = filesToDelete.iterator();
269      assertTrue(iter.hasNext());
270      assertEquals(new Path("log1"), iter.next().getPath());
271      assertTrue(iter.hasNext());
272      assertEquals(new Path("log2"), iter.next().getPath());
273      assertFalse(iter.hasNext());
274    } finally {
275      zkw.close();
276    }
277  }
278
279  @Test
280  public void testOnConfigurationChange() throws Exception {
281    // Prepare environments
282    Server server = new DummyServer();
283
284    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
285    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null);
286    int size = cleaner.getSizeOfCleaners();
287    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
288      cleaner.getCleanerThreadTimeoutMsec());
289    // Create dir and files for test
290    int numOfFiles = 10;
291    createFiles(fs, OLD_WALS_DIR, numOfFiles);
292    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
293    assertEquals(numOfFiles, status.length);
294    // Start cleaner chore
295    Thread thread = new Thread(() -> cleaner.chore());
296    thread.setDaemon(true);
297    thread.start();
298    // change size of cleaners dynamically
299    int sizeToChange = 4;
300    long threadTimeoutToChange = 30 * 1000L;
301    conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange);
302    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange);
303    cleaner.onConfigurationChange(conf);
304    assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners());
305    assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec());
306    // Stop chore
307    thread.join();
308    status = fs.listStatus(OLD_WALS_DIR);
309    assertEquals(0, status.length);
310  }
311
312  private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
313    for (int i = 0; i < numOfFiles; i++) {
314      // size of each file is 1M, 2M, or 3M
315      int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4);
316      byte[] M = new byte[Math.toIntExact(FileUtils.ONE_MB * xMega)];
317      Bytes.random(M);
318      try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
319        fsdos.write(M);
320      }
321    }
322  }
323
324  static class DummyServer implements Server {
325
326    @Override
327    public Configuration getConfiguration() {
328      return TEST_UTIL.getConfiguration();
329    }
330
331    @Override
332    public ZKWatcher getZooKeeper() {
333      try {
334        return new ZKWatcher(getConfiguration(), "dummy server", this);
335      } catch (IOException e) {
336        e.printStackTrace();
337      }
338      return null;
339    }
340
341    @Override
342    public CoordinatedStateManager getCoordinatedStateManager() {
343      return null;
344    }
345
346    @Override
347    public ClusterConnection getConnection() {
348      return null;
349    }
350
351    @Override
352    public ServerName getServerName() {
353      return ServerName.valueOf("regionserver,60020,000000");
354    }
355
356    @Override
357    public void abort(String why, Throwable e) {
358    }
359
360    @Override
361    public boolean isAborted() {
362      return false;
363    }
364
365    @Override
366    public void stop(String why) {
367    }
368
369    @Override
370    public boolean isStopped() {
371      return false;
372    }
373
374    @Override
375    public ChoreService getChoreService() {
376      return null;
377    }
378
379    @Override
380    public ClusterConnection getClusterConnection() {
381      return null;
382    }
383
384    @Override
385    public FileSystem getFileSystem() {
386      return null;
387    }
388
389    @Override
390    public boolean isStopping() {
391      return false;
392    }
393
394    @Override
395    public Connection createConnection(Configuration conf) throws IOException {
396      return null;
397    }
398  }
399
400  static class FaultyZooKeeperWatcher extends ZKWatcher {
401    private RecoverableZooKeeper zk;
402
403    public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
404      throws ZooKeeperConnectionException, IOException {
405      super(conf, identifier, abortable);
406    }
407
408    public void init() throws Exception {
409      this.zk = spy(super.getRecoverableZooKeeper());
410      doThrow(new KeeperException.ConnectionLossException()).when(zk)
411        .getChildren("/hbase/replication/rs", null);
412    }
413
414    @Override
415    public RecoverableZooKeeper getRecoverableZooKeeper() {
416      return zk;
417    }
418  }
419}