001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.doThrow;
024import static org.mockito.Mockito.spy;
025
026import java.io.IOException;
027import java.net.URLEncoder;
028import java.nio.charset.StandardCharsets;
029import java.util.Arrays;
030import java.util.Iterator;
031import java.util.List;
032import java.util.concurrent.ThreadLocalRandom;
033import org.apache.commons.io.FileUtils;
034import org.apache.commons.lang3.RandomUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FSDataOutputStream;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Abortable;
041import org.apache.hadoop.hbase.ChoreService;
042import org.apache.hadoop.hbase.CoordinatedStateManager;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.Waiter;
049import org.apache.hadoop.hbase.ZooKeeperConnectionException;
050import org.apache.hadoop.hbase.client.ClusterConnection;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.master.HMaster;
053import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
054import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
055import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
056import org.apache.hadoop.hbase.testclassification.MasterTests;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
059import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
060import org.apache.zookeeper.KeeperException;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070@Category({MasterTests.class, MediumTests.class})
071public class TestLogsCleaner {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075      HBaseClassTestRule.forClass(TestLogsCleaner.class);
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class);
078  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
079
080  private static final Path OLD_WALS_DIR =
081      new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
082
083  private static final Path OLD_PROCEDURE_WALS_DIR =
084      new Path(OLD_WALS_DIR, "masterProcedureWALs");
085
086  private static Configuration conf;
087
088  private static DirScanPool POOL;
089
090  @BeforeClass
091  public static void setUpBeforeClass() throws Exception {
092    TEST_UTIL.startMiniZKCluster();
093    TEST_UTIL.startMiniDFSCluster(1);
094    POOL = new DirScanPool(TEST_UTIL.getConfiguration());
095  }
096
097  @AfterClass
098  public static void tearDownAfterClass() throws Exception {
099    TEST_UTIL.shutdownMiniZKCluster();
100    TEST_UTIL.shutdownMiniDFSCluster();
101    POOL.shutdownNow();
102  }
103
104  @Before
105  public void beforeTest() throws IOException {
106    conf = TEST_UTIL.getConfiguration();
107
108    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
109
110    fs.delete(OLD_WALS_DIR, true);
111
112    // root directory
113    fs.mkdirs(OLD_WALS_DIR);
114  }
115
116  /**
117   * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located
118   * in the same oldWALs directory.
119   * Created files:
120   * - 2 invalid files
121   * - 5 old Procedure WALs
122   * - 30 old WALs from which 3 are in replication
123   * - 5 recent Procedure WALs
124   * - 1 recent WAL
125   * - 1 very new WAL (timestamp in future)
126   * - masterProcedureWALs subdirectory
127   * Files which should stay:
128   * - 3 replication WALs
129   * - 2 new WALs
130   * - 5 latest Procedure WALs
131   * - masterProcedureWALs subdirectory
132   */
133  @Test
134  public void testLogCleaning() throws Exception {
135    // set TTLs
136    long ttlWAL = 2000;
137    long ttlProcedureWAL = 4000;
138    conf.setLong("hbase.master.logcleaner.ttl", ttlWAL);
139    conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL);
140
141    HMaster.decorateMasterConfiguration(conf);
142    Server server = new DummyServer();
143    ReplicationQueueStorage queueStorage =
144        ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
145
146    String fakeMachineName = URLEncoder.encode(
147        server.getServerName().toString(), StandardCharsets.UTF_8.name());
148
149    final FileSystem fs = FileSystem.get(conf);
150    fs.mkdirs(OLD_PROCEDURE_WALS_DIR);
151
152    final long now = System.currentTimeMillis();
153
154    // Case 1: 2 invalid files, which would be deleted directly
155    fs.createNewFile(new Path(OLD_WALS_DIR, "a"));
156    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a"));
157
158    // Case 2: 5 Procedure WALs that are old which would be deleted
159    for (int i = 1; i <= 5; i++) {
160      final Path fileName =
161          new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
162      fs.createNewFile(fileName);
163    }
164
165    // Sleep for sometime to get old procedure WALs
166    Thread.sleep(ttlProcedureWAL - ttlWAL);
167
168    // Case 3: old WALs which would be deletable
169    for (int i = 1; i <= 30; i++) {
170      Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
171      fs.createNewFile(fileName);
172      // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
173      // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
174      if (i % (30 / 3) == 0) {
175        queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
176        LOG.info("Replication log file: " + fileName);
177      }
178    }
179
180    // Case 5: 5 Procedure WALs that are new, will stay
181    for (int i = 6; i <= 10; i++) {
182      Path fileName =
183          new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
184      fs.createNewFile(fileName);
185    }
186
187    // Sleep for sometime to get newer modification time
188    Thread.sleep(ttlWAL);
189    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now));
190
191    // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner,
192    // so we are not going down the chain
193    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL)));
194
195    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
196    LOG.info("File status: {}", Arrays.toString(status));
197
198    // There should be 34 files and 1 masterProcedureWALs directory
199    assertEquals(35, fs.listStatus(OLD_WALS_DIR).length);
200    // 10 procedure WALs
201    assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
202
203    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL);
204    cleaner.chore();
205
206    // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
207    // are scheduled for replication and masterProcedureWALs directory
208    TEST_UTIL.waitFor(1000, (Waiter.Predicate<Exception>) () -> 6 == fs
209        .listStatus(OLD_WALS_DIR).length);
210    // In masterProcedureWALs we end up with 5 newer Procedure WALs
211    TEST_UTIL.waitFor(1000, (Waiter.Predicate<Exception>) () -> 5 == fs
212        .listStatus(OLD_PROCEDURE_WALS_DIR).length);
213
214    if (LOG.isDebugEnabled()) {
215      FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR);
216      FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR);
217      LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs));
218      LOG.debug("Kept log file for masterProcedureWALs: {}",
219          Arrays.toString(statusProcedureWALs));
220    }
221  }
222
223  /**
224   * ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting.
225   */
226  @Test
227  public void testZooKeeperAbort() throws Exception {
228    Configuration conf = TEST_UTIL.getConfiguration();
229    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
230
231    List<FileStatus> dummyFiles = Arrays.asList(
232        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
233        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
234    );
235
236    try (FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf,
237        "testZooKeeperAbort-faulty", null)) {
238      faultyZK.init();
239      cleaner.setConf(conf, faultyZK);
240      cleaner.preClean();
241      // should keep all files due to a ConnectionLossException getting the queues znodes
242      Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
243      assertFalse(toDelete.iterator().hasNext());
244      assertFalse(cleaner.isStopped());
245    }
246
247    // when zk is working both files should be returned
248    cleaner = new ReplicationLogCleaner();
249    try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null)) {
250      cleaner.setConf(conf, zkw);
251      cleaner.preClean();
252      Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
253      Iterator<FileStatus> iter = filesToDelete.iterator();
254      assertTrue(iter.hasNext());
255      assertEquals(new Path("log1"), iter.next().getPath());
256      assertTrue(iter.hasNext());
257      assertEquals(new Path("log2"), iter.next().getPath());
258      assertFalse(iter.hasNext());
259    }
260  }
261
262  /**
263   * When zk is working both files should be returned
264   * @throws Exception from ZK watcher
265   */
266  @Test
267  public void testZooKeeperNormal() throws Exception {
268    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
269
270    // Subtract 1000 from current time so modtime is for sure older
271    // than 'now'.
272    long modTime = System.currentTimeMillis() - 1000;
273    List<FileStatus> dummyFiles = Arrays.asList(
274        new FileStatus(100, false, 3, 100, modTime, new Path("log1")),
275        new FileStatus(100, false, 3, 100, modTime, new Path("log2"))
276    );
277
278    ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null);
279    try {
280      cleaner.setConf(conf, zkw);
281      cleaner.preClean();
282      Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
283      Iterator<FileStatus> iter = filesToDelete.iterator();
284      assertTrue(iter.hasNext());
285      assertEquals(new Path("log1"), iter.next().getPath());
286      assertTrue(iter.hasNext());
287      assertEquals(new Path("log2"), iter.next().getPath());
288      assertFalse(iter.hasNext());
289    } finally {
290      zkw.close();
291    }
292  }
293
294  @Test
295  public void testOnConfigurationChange() throws Exception {
296    // Prepare environments
297    Server server = new DummyServer();
298
299    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
300    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL);
301    int size = cleaner.getSizeOfCleaners();
302    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
303        cleaner.getCleanerThreadTimeoutMsec());
304    // Create dir and files for test
305    int numOfFiles = 10;
306    createFiles(fs, OLD_WALS_DIR, numOfFiles);
307    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
308    assertEquals(numOfFiles, status.length);
309    // Start cleaner chore
310    Thread thread = new Thread(() -> cleaner.chore());
311    thread.setDaemon(true);
312    thread.start();
313    // change size of cleaners dynamically
314    int sizeToChange = 4;
315    long threadTimeoutToChange = 30 * 1000L;
316    conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange);
317    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange);
318    cleaner.onConfigurationChange(conf);
319    assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners());
320    assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec());
321    // Stop chore
322    thread.join();
323    status = fs.listStatus(OLD_WALS_DIR);
324    assertEquals(0, status.length);
325  }
326
327  private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
328    for (int i = 0; i < numOfFiles; i++) {
329      // size of each file is 1M, 2M, or 3M
330      int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4);
331      try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
332        byte[] M = RandomUtils.nextBytes(Math.toIntExact(FileUtils.ONE_MB * xMega));
333        fsdos.write(M);
334      }
335    }
336  }
337
338  static class DummyServer implements Server {
339
340    @Override
341    public Configuration getConfiguration() {
342      return TEST_UTIL.getConfiguration();
343    }
344
345    @Override
346    public ZKWatcher getZooKeeper() {
347      try {
348        return new ZKWatcher(getConfiguration(), "dummy server", this);
349      } catch (IOException e) {
350        e.printStackTrace();
351      }
352      return null;
353    }
354
355    @Override
356    public CoordinatedStateManager getCoordinatedStateManager() {
357      return null;
358    }
359
360    @Override
361    public ClusterConnection getConnection() {
362      return null;
363    }
364
365    @Override
366    public ServerName getServerName() {
367      return ServerName.valueOf("regionserver,60020,000000");
368    }
369
370    @Override
371    public void abort(String why, Throwable e) {}
372
373    @Override
374    public boolean isAborted() {
375      return false;
376    }
377
378    @Override
379    public void stop(String why) {}
380
381    @Override
382    public boolean isStopped() {
383      return false;
384    }
385
386    @Override
387    public ChoreService getChoreService() {
388      return null;
389    }
390
391    @Override
392    public ClusterConnection getClusterConnection() {
393      return null;
394    }
395
396    @Override
397    public FileSystem getFileSystem() {
398      return null;
399    }
400
401    @Override
402    public boolean isStopping() {
403      return false;
404    }
405
406    @Override
407    public Connection createConnection(Configuration conf) throws IOException {
408      return null;
409    }
410  }
411
412  static class FaultyZooKeeperWatcher extends ZKWatcher {
413    private RecoverableZooKeeper zk;
414
415    public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
416        throws ZooKeeperConnectionException, IOException {
417      super(conf, identifier, abortable);
418    }
419
420    public void init() throws Exception {
421      this.zk = spy(super.getRecoverableZooKeeper());
422      doThrow(new KeeperException.ConnectionLossException())
423        .when(zk).getChildren("/hbase/replication/rs", null);
424    }
425
426    @Override
427    public RecoverableZooKeeper getRecoverableZooKeeper() {
428      return zk;
429    }
430  }
431}