001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.doAnswer;
024import static org.mockito.Mockito.doThrow;
025import static org.mockito.Mockito.spy;
026
027import java.io.IOException;
028import java.net.URLEncoder;
029import java.nio.charset.StandardCharsets;
030import java.util.Arrays;
031import java.util.Iterator;
032import java.util.List;
033import java.util.concurrent.ThreadLocalRandom;
034import java.util.concurrent.atomic.AtomicBoolean;
035import org.apache.commons.io.FileUtils;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.Abortable;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.Waiter;
047import org.apache.hadoop.hbase.ZooKeeperConnectionException;
048import org.apache.hadoop.hbase.master.HMaster;
049import org.apache.hadoop.hbase.replication.ReplicationException;
050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
052import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
053import org.apache.hadoop.hbase.testclassification.MasterTests;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.util.MockServer;
058import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
059import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
060import org.apache.zookeeper.KeeperException;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.mockito.invocation.InvocationOnMock;
068import org.mockito.stubbing.Answer;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072@Category({ MasterTests.class, MediumTests.class })
073public class TestLogsCleaner {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestLogsCleaner.class);
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class);
080  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
081
082  private static final Path OLD_WALS_DIR =
083    new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
084
085  private static final Path OLD_PROCEDURE_WALS_DIR = new Path(OLD_WALS_DIR, "masterProcedureWALs");
086
087  private static Configuration conf;
088
089  private static DirScanPool POOL;
090
091  @BeforeClass
092  public static void setUpBeforeClass() throws Exception {
093    TEST_UTIL.startMiniZKCluster();
094    TEST_UTIL.startMiniDFSCluster(1);
095    POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
096  }
097
098  @AfterClass
099  public static void tearDownAfterClass() throws Exception {
100    TEST_UTIL.shutdownMiniZKCluster();
101    TEST_UTIL.shutdownMiniDFSCluster();
102    POOL.shutdownNow();
103  }
104
105  @Before
106  public void beforeTest() throws IOException {
107    conf = TEST_UTIL.getConfiguration();
108
109    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
110
111    fs.delete(OLD_WALS_DIR, true);
112
113    // root directory
114    fs.mkdirs(OLD_WALS_DIR);
115  }
116
117  /**
118   * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same
119   * oldWALs directory. Created files: - 2 invalid files - 5 old Procedure WALs - 30 old WALs from
120   * which 3 are in replication - 5 recent Procedure WALs - 1 recent WAL - 1 very new WAL (timestamp
121   * in future) - masterProcedureWALs subdirectory Files which should stay: - 3 replication WALs - 2
122   * new WALs - 5 latest Procedure WALs - masterProcedureWALs subdirectory
123   */
124  @Test
125  public void testLogCleaning() throws Exception {
126    // set TTLs
127    long ttlWAL = 2000;
128    long ttlProcedureWAL = 4000;
129    conf.setLong("hbase.master.logcleaner.ttl", ttlWAL);
130    conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL);
131
132    HMaster.decorateMasterConfiguration(conf);
133    Server server = new DummyServer();
134    ReplicationQueueStorage queueStorage =
135      ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
136
137    String fakeMachineName =
138      URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name());
139
140    final FileSystem fs = FileSystem.get(conf);
141    fs.mkdirs(OLD_PROCEDURE_WALS_DIR);
142
143    final long now = EnvironmentEdgeManager.currentTime();
144
145    // Case 1: 2 invalid files, which would be deleted directly
146    fs.createNewFile(new Path(OLD_WALS_DIR, "a"));
147    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a"));
148
149    // Case 2: 5 Procedure WALs that are old which would be deleted
150    for (int i = 1; i <= 5; i++) {
151      final Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
152      fs.createNewFile(fileName);
153    }
154
155    // Sleep for sometime to get old procedure WALs
156    Thread.sleep(ttlProcedureWAL - ttlWAL);
157
158    // Case 3: old WALs which would be deletable
159    for (int i = 1; i <= 30; i++) {
160      Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
161      fs.createNewFile(fileName);
162      // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
163      // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
164      if (i % (30 / 3) == 0) {
165        queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
166        LOG.info("Replication log file: " + fileName);
167      }
168    }
169
170    // Case 5: 5 Procedure WALs that are new, will stay
171    for (int i = 6; i <= 10; i++) {
172      Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
173      fs.createNewFile(fileName);
174    }
175
176    // Sleep for sometime to get newer modification time
177    Thread.sleep(ttlWAL);
178    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now));
179
180    // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner,
181    // so we are not going down the chain
182    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL)));
183
184    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
185    LOG.info("File status: {}", Arrays.toString(status));
186
187    // There should be 34 files and 1 masterProcedureWALs directory
188    assertEquals(35, fs.listStatus(OLD_WALS_DIR).length);
189    // 10 procedure WALs
190    assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
191
192    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null);
193    cleaner.chore();
194
195    // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
196    // are scheduled for replication and masterProcedureWALs directory
197    TEST_UTIL.waitFor(1000,
198      (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(OLD_WALS_DIR).length);
199    // In masterProcedureWALs we end up with 5 newer Procedure WALs
200    TEST_UTIL.waitFor(1000,
201      (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
202
203    if (LOG.isDebugEnabled()) {
204      FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR);
205      FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR);
206      LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs));
207      LOG.debug("Kept log file for masterProcedureWALs: {}", Arrays.toString(statusProcedureWALs));
208    }
209  }
210
211  @Test
212  public void testZooKeeperRecoveryDuringGetListOfReplicators() throws Exception {
213    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
214
215    List<FileStatus> dummyFiles = Arrays.asList(
216      new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log1")),
217      new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log2")));
218
219    FaultyZooKeeperWatcher faultyZK =
220      new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
221    final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
222
223    try {
224      faultyZK.init(false);
225      ReplicationQueueStorage queueStorage =
226        spy(ReplicationStorageFactory.getReplicationQueueStorage(faultyZK, conf));
227      doAnswer(new Answer<Object>() {
228        @Override
229        public Object answer(InvocationOnMock invocation) throws Throwable {
230          try {
231            return invocation.callRealMethod();
232          } catch (ReplicationException e) {
233            LOG.debug("Caught Exception", e);
234            getListOfReplicatorsFailed.set(true);
235            throw e;
236          }
237        }
238      }).when(queueStorage).getAllWALs();
239
240      cleaner.setConf(conf, faultyZK, queueStorage);
241      // should keep all files due to a ConnectionLossException getting the queues znodes
242      cleaner.preClean();
243      Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
244
245      assertTrue(getListOfReplicatorsFailed.get());
246      assertFalse(toDelete.iterator().hasNext());
247      assertFalse(cleaner.isStopped());
248
249      // zk recovery.
250      faultyZK.init(true);
251      cleaner.preClean();
252      Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
253      Iterator<FileStatus> iter = filesToDelete.iterator();
254      assertTrue(iter.hasNext());
255      assertEquals(new Path("log1"), iter.next().getPath());
256      assertTrue(iter.hasNext());
257      assertEquals(new Path("log2"), iter.next().getPath());
258      assertFalse(iter.hasNext());
259
260    } finally {
261      faultyZK.close();
262    }
263  }
264
265  /**
266   * When zk is working both files should be returned
267   * @throws Exception from ZK watcher
268   */
269  @Test
270  public void testZooKeeperNormal() throws Exception {
271    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
272
273    // Subtract 1000 from current time so modtime is for sure older
274    // than 'now'.
275    long modTime = EnvironmentEdgeManager.currentTime() - 1000;
276    List<FileStatus> dummyFiles =
277      Arrays.asList(new FileStatus(100, false, 3, 100, modTime, new Path("log1")),
278        new FileStatus(100, false, 3, 100, modTime, new Path("log2")));
279
280    ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null);
281    try {
282      cleaner.setConf(conf, zkw);
283      cleaner.preClean();
284      Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
285      Iterator<FileStatus> iter = filesToDelete.iterator();
286      assertTrue(iter.hasNext());
287      assertEquals(new Path("log1"), iter.next().getPath());
288      assertTrue(iter.hasNext());
289      assertEquals(new Path("log2"), iter.next().getPath());
290      assertFalse(iter.hasNext());
291    } finally {
292      zkw.close();
293    }
294  }
295
296  @Test
297  public void testOnConfigurationChange() throws Exception {
298    // Prepare environments
299    Server server = new DummyServer();
300
301    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
302    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null);
303    int size = cleaner.getSizeOfCleaners();
304    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
305      cleaner.getCleanerThreadTimeoutMsec());
306    // Create dir and files for test
307    int numOfFiles = 10;
308    createFiles(fs, OLD_WALS_DIR, numOfFiles);
309    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
310    assertEquals(numOfFiles, status.length);
311    // Start cleaner chore
312    Thread thread = new Thread(() -> cleaner.chore());
313    thread.setDaemon(true);
314    thread.start();
315    // change size of cleaners dynamically
316    int sizeToChange = 4;
317    long threadTimeoutToChange = 30 * 1000L;
318    conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange);
319    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange);
320    cleaner.onConfigurationChange(conf);
321    assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners());
322    assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec());
323    // Stop chore
324    thread.join();
325    status = fs.listStatus(OLD_WALS_DIR);
326    assertEquals(0, status.length);
327  }
328
329  private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
330    for (int i = 0; i < numOfFiles; i++) {
331      // size of each file is 1M, 2M, or 3M
332      int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4);
333      byte[] M = new byte[Math.toIntExact(FileUtils.ONE_MB * xMega)];
334      Bytes.random(M);
335      try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
336        fsdos.write(M);
337      }
338    }
339  }
340
341  static class DummyServer extends MockServer {
342
343    @Override
344    public Configuration getConfiguration() {
345      return TEST_UTIL.getConfiguration();
346    }
347
348    @Override
349    public ZKWatcher getZooKeeper() {
350      try {
351        return new ZKWatcher(getConfiguration(), "dummy server", this);
352      } catch (IOException e) {
353        e.printStackTrace();
354      }
355      return null;
356    }
357  }
358
359  static class FaultyZooKeeperWatcher extends ZKWatcher {
360    private RecoverableZooKeeper zk;
361
362    public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
363      throws ZooKeeperConnectionException, IOException {
364      super(conf, identifier, abortable);
365    }
366
367    public void init(boolean autoRecovery) throws Exception {
368      this.zk = spy(super.getRecoverableZooKeeper());
369      if (!autoRecovery) {
370        doThrow(new KeeperException.ConnectionLossException()).when(zk)
371          .getChildren("/hbase/replication/rs", null);
372      }
373    }
374
375    @Override
376    public RecoverableZooKeeper getRecoverableZooKeeper() {
377      return zk;
378    }
379  }
380}