001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.doThrow;
024import static org.mockito.Mockito.spy;
025
026import java.io.IOException;
027import java.net.URLEncoder;
028import java.nio.charset.StandardCharsets;
029import java.util.Arrays;
030import java.util.Iterator;
031import java.util.List;
032import java.util.concurrent.ThreadLocalRandom;
033import org.apache.commons.io.FileUtils;
034import org.apache.commons.lang3.RandomUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FSDataOutputStream;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Abortable;
041import org.apache.hadoop.hbase.ChoreService;
042import org.apache.hadoop.hbase.CoordinatedStateManager;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.Waiter;
049import org.apache.hadoop.hbase.ZooKeeperConnectionException;
050import org.apache.hadoop.hbase.client.ClusterConnection;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.master.HMaster;
053import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
054import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
055import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
056import org.apache.hadoop.hbase.testclassification.MasterTests;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
059import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
060import org.apache.zookeeper.KeeperException;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070@Category({MasterTests.class, MediumTests.class})
071public class TestLogsCleaner {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075      HBaseClassTestRule.forClass(TestLogsCleaner.class);
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class);
078  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
079
080  private static final Path OLD_WALS_DIR =
081      new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
082
083  private static final Path OLD_PROCEDURE_WALS_DIR =
084      new Path(OLD_WALS_DIR, "masterProcedureWALs");
085
086  private static Configuration conf;
087
088  private static DirScanPool POOL;
089
090  @BeforeClass
091  public static void setUpBeforeClass() throws Exception {
092    TEST_UTIL.startMiniZKCluster();
093    TEST_UTIL.startMiniDFSCluster(1);
094    POOL = new DirScanPool(TEST_UTIL.getConfiguration());
095  }
096
097  @AfterClass
098  public static void tearDownAfterClass() throws Exception {
099    TEST_UTIL.shutdownMiniZKCluster();
100    TEST_UTIL.shutdownMiniDFSCluster();
101    POOL.shutdownNow();
102  }
103
104  @Before
105  public void beforeTest() throws IOException {
106    conf = TEST_UTIL.getConfiguration();
107
108    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
109
110    fs.delete(OLD_WALS_DIR, true);
111
112    // root directory
113    fs.mkdirs(OLD_WALS_DIR);
114  }
115
116  /**
117   * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located
118   * in the same oldWALs directory.
119   * Created files:
120   * - 2 invalid files
121   * - 5 old Procedure WALs
122   * - 30 old WALs from which 3 are in replication
123   * - 5 recent Procedure WALs
124   * - 1 recent WAL
125   * - 1 very new WAL (timestamp in future)
126   * - masterProcedureWALs subdirectory
127   * Files which should stay:
128   * - 3 replication WALs
129   * - 2 new WALs
130   * - 5 latest Procedure WALs
131   * - masterProcedureWALs subdirectory
132   */
133  @Test
134  public void testLogCleaning() throws Exception {
135    // set TTLs
136    long ttlWAL = 2000;
137    long ttlProcedureWAL = 4000;
138    conf.setLong("hbase.master.logcleaner.ttl", ttlWAL);
139    conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL);
140
141    HMaster.decorateMasterConfiguration(conf);
142    Server server = new DummyServer();
143    ReplicationQueueStorage queueStorage =
144        ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
145
146    String fakeMachineName = URLEncoder.encode(
147        server.getServerName().toString(), StandardCharsets.UTF_8.name());
148
149    final FileSystem fs = FileSystem.get(conf);
150    fs.mkdirs(OLD_PROCEDURE_WALS_DIR);
151
152    final long now = System.currentTimeMillis();
153
154    // Case 1: 2 invalid files, which would be deleted directly
155    fs.createNewFile(new Path(OLD_WALS_DIR, "a"));
156    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a"));
157
158    // Case 2: 5 Procedure WALs that are old which would be deleted
159    for (int i = 1; i <= 5; i++) {
160      final Path fileName =
161          new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
162      fs.createNewFile(fileName);
163    }
164
165    // Sleep for sometime to get old procedure WALs
166    Thread.sleep(ttlProcedureWAL - ttlWAL);
167
168    // Case 3: old WALs which would be deletable
169    for (int i = 1; i <= 30; i++) {
170      Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
171      fs.createNewFile(fileName);
172      // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
173      // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
174      if (i % (30 / 3) == 0) {
175        queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
176        LOG.info("Replication log file: " + fileName);
177      }
178    }
179
180    // Case 5: 5 Procedure WALs that are new, will stay
181    for (int i = 6; i <= 10; i++) {
182      Path fileName =
183          new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
184      fs.createNewFile(fileName);
185    }
186
187    // Sleep for sometime to get newer modification time
188    Thread.sleep(ttlWAL);
189    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now));
190
191    // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner,
192    // so we are not going down the chain
193    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL)));
194
195    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
196    LOG.info("File status: {}", Arrays.toString(status));
197
198    // There should be 34 files and 1 masterProcedureWALs directory
199    assertEquals(35, fs.listStatus(OLD_WALS_DIR).length);
200    // 10 procedure WALs
201    assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
202
203    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL);
204    cleaner.chore();
205
206    // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
207    // are scheduled for replication and masterProcedureWALs directory
208    TEST_UTIL.waitFor(1000, (Waiter.Predicate<Exception>) () -> 6 == fs
209        .listStatus(OLD_WALS_DIR).length);
210    // In masterProcedureWALs we end up with 5 newer Procedure WALs
211    TEST_UTIL.waitFor(1000, (Waiter.Predicate<Exception>) () -> 5 == fs
212        .listStatus(OLD_PROCEDURE_WALS_DIR).length);
213
214    if (LOG.isDebugEnabled()) {
215      FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR);
216      FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR);
217      LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs));
218      LOG.debug("Kept log file for masterProcedureWALs: {}",
219          Arrays.toString(statusProcedureWALs));
220    }
221  }
222
223  /**
224   * ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting.
225   */
226  @Test
227  public void testZooKeeperAbort() throws Exception {
228    Configuration conf = TEST_UTIL.getConfiguration();
229    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
230
231    List<FileStatus> dummyFiles = Arrays.asList(
232        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
233        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
234    );
235
236    try (FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf,
237        "testZooKeeperAbort-faulty", null)) {
238      faultyZK.init();
239      cleaner.setConf(conf, faultyZK);
240      cleaner.preClean();
241      // should keep all files due to a ConnectionLossException getting the queues znodes
242      Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
243      assertFalse(toDelete.iterator().hasNext());
244      assertFalse(cleaner.isStopped());
245    }
246
247    // when zk is working both files should be returned
248    cleaner = new ReplicationLogCleaner();
249    try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null)) {
250      cleaner.setConf(conf, zkw);
251      cleaner.preClean();
252      Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
253      Iterator<FileStatus> iter = filesToDelete.iterator();
254      assertTrue(iter.hasNext());
255      assertEquals(new Path("log1"), iter.next().getPath());
256      assertTrue(iter.hasNext());
257      assertEquals(new Path("log2"), iter.next().getPath());
258      assertFalse(iter.hasNext());
259    }
260  }
261
262  /**
263   * When zk is working both files should be returned
264   * @throws Exception from ZK watcher
265   */
266  @Test
267  public void testZooKeeperNormal() throws Exception {
268    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
269
270    List<FileStatus> dummyFiles = Arrays.asList(
271        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
272        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
273    );
274
275    ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null);
276    try {
277      cleaner.setConf(conf, zkw);
278      cleaner.preClean();
279      Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
280      Iterator<FileStatus> iter = filesToDelete.iterator();
281      assertTrue(iter.hasNext());
282      assertEquals(new Path("log1"), iter.next().getPath());
283      assertTrue(iter.hasNext());
284      assertEquals(new Path("log2"), iter.next().getPath());
285      assertFalse(iter.hasNext());
286    } finally {
287      zkw.close();
288    }
289  }
290
291  @Test
292  public void testOnConfigurationChange() throws Exception {
293    // Prepare environments
294    Server server = new DummyServer();
295
296    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
297    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL);
298    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners());
299    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
300        cleaner.getCleanerThreadTimeoutMsec());
301    // Create dir and files for test
302    int numOfFiles = 10;
303    createFiles(fs, OLD_WALS_DIR, numOfFiles);
304    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
305    assertEquals(numOfFiles, status.length);
306    // Start cleaner chore
307    Thread thread = new Thread(() -> cleaner.chore());
308    thread.setDaemon(true);
309    thread.start();
310    // change size of cleaners dynamically
311    int sizeToChange = 4;
312    long threadTimeoutToChange = 30 * 1000L;
313    conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange);
314    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange);
315    cleaner.onConfigurationChange(conf);
316    assertEquals(sizeToChange, cleaner.getSizeOfCleaners());
317    assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec());
318    // Stop chore
319    thread.join();
320    status = fs.listStatus(OLD_WALS_DIR);
321    assertEquals(0, status.length);
322  }
323
324  private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
325    for (int i = 0; i < numOfFiles; i++) {
326      // size of each file is 1M, 2M, or 3M
327      int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4);
328      try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
329        byte[] M = RandomUtils.nextBytes(Math.toIntExact(FileUtils.ONE_MB * xMega));
330        fsdos.write(M);
331      }
332    }
333  }
334
335  static class DummyServer implements Server {
336
337    @Override
338    public Configuration getConfiguration() {
339      return TEST_UTIL.getConfiguration();
340    }
341
342    @Override
343    public ZKWatcher getZooKeeper() {
344      try {
345        return new ZKWatcher(getConfiguration(), "dummy server", this);
346      } catch (IOException e) {
347        e.printStackTrace();
348      }
349      return null;
350    }
351
352    @Override
353    public CoordinatedStateManager getCoordinatedStateManager() {
354      return null;
355    }
356
357    @Override
358    public ClusterConnection getConnection() {
359      return null;
360    }
361
362    @Override
363    public ServerName getServerName() {
364      return ServerName.valueOf("regionserver,60020,000000");
365    }
366
367    @Override
368    public void abort(String why, Throwable e) {}
369
370    @Override
371    public boolean isAborted() {
372      return false;
373    }
374
375    @Override
376    public void stop(String why) {}
377
378    @Override
379    public boolean isStopped() {
380      return false;
381    }
382
383    @Override
384    public ChoreService getChoreService() {
385      return null;
386    }
387
388    @Override
389    public ClusterConnection getClusterConnection() {
390      return null;
391    }
392
393    @Override
394    public FileSystem getFileSystem() {
395      return null;
396    }
397
398    @Override
399    public boolean isStopping() {
400      return false;
401    }
402
403    @Override
404    public Connection createConnection(Configuration conf) throws IOException {
405      return null;
406    }
407  }
408
409  static class FaultyZooKeeperWatcher extends ZKWatcher {
410    private RecoverableZooKeeper zk;
411
412    public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
413        throws ZooKeeperConnectionException, IOException {
414      super(conf, identifier, abortable);
415    }
416
417    public void init() throws Exception {
418      this.zk = spy(super.getRecoverableZooKeeper());
419      doThrow(new KeeperException.ConnectionLossException())
420        .when(zk).getChildren("/hbase/replication/rs", null);
421    }
422
423    @Override
424    public RecoverableZooKeeper getRecoverableZooKeeper() {
425      return zk;
426    }
427  }
428}