001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.doThrow;
024import static org.mockito.Mockito.spy;
025import static org.mockito.Mockito.doAnswer;
026
027import java.io.IOException;
028import java.lang.reflect.Field;
029import java.net.URLEncoder;
030import java.util.Iterator;
031import java.util.LinkedList;
032import java.util.List;
033import java.util.Random;
034import java.util.concurrent.atomic.AtomicBoolean;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.Abortable;
042import org.apache.hadoop.hbase.ChoreService;
043import org.apache.hadoop.hbase.CoordinatedStateManager;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtility;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.Server;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.Waiter;
050import org.apache.hadoop.hbase.ZooKeeperConnectionException;
051import org.apache.hadoop.hbase.client.ClusterConnection;
052import org.apache.hadoop.hbase.client.Connection;
053import org.apache.hadoop.hbase.master.HMaster;
054import org.apache.hadoop.hbase.replication.ReplicationFactory;
055import org.apache.hadoop.hbase.replication.ReplicationQueues;
056import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
057import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
058import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
059import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
060import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
061import org.apache.hadoop.hbase.testclassification.MasterTests;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
064import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
065import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
066import org.apache.zookeeper.KeeperException;
067import org.junit.AfterClass;
068import org.junit.BeforeClass;
069import org.junit.ClassRule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.mockito.Mockito;
073import org.mockito.invocation.InvocationOnMock;
074import org.mockito.stubbing.Answer;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
079
080@Category({MasterTests.class, MediumTests.class})
081public class TestLogsCleaner {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085      HBaseClassTestRule.forClass(TestLogsCleaner.class);
086
087  private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class);
088  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
089
090  @BeforeClass
091  public static void setUpBeforeClass() throws Exception {
092    TEST_UTIL.startMiniZKCluster();
093    TEST_UTIL.startMiniDFSCluster(1);
094  }
095
096  @AfterClass
097  public static void tearDownAfterClass() throws Exception {
098    TEST_UTIL.shutdownMiniZKCluster();
099    TEST_UTIL.shutdownMiniDFSCluster();
100  }
101
102  /**
103   * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located
104   * in the same oldWALs directory.
105   * Created files:
106   * - 2 invalid files
107   * - 5 old Procedure WALs
108   * - 30 old WALs from which 3 are in replication
109   * - 5 recent Procedure WALs
110   * - 1 recent WAL
111   * - 1 very new WAL (timestamp in future)
112   * - masterProcedureWALs subdirectory
113   * Files which should stay:
114   * - 3 replication WALs
115   * - 2 new WALs
116   * - 5 latest Procedure WALs
117   * - masterProcedureWALs subdirectory
118   */
119  @Test
120  public void testLogCleaning() throws Exception {
121    Configuration conf = TEST_UTIL.getConfiguration();
122    // set TTLs
123    long ttlWAL = 2000;
124    long ttlProcedureWAL = 4000;
125    conf.setLong("hbase.master.logcleaner.ttl", ttlWAL);
126    conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL);
127
128    HMaster.decorateMasterConfiguration(conf);
129    Server server = new DummyServer();
130    ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(
131        new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
132    repQueues.init(server.getServerName().toString());
133    final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
134    final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs");
135    String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8");
136
137    final FileSystem fs = FileSystem.get(conf);
138
139    long now = System.currentTimeMillis();
140    fs.delete(oldLogDir, true);
141    fs.mkdirs(oldLogDir);
142
143    // Case 1: 2 invalid files, which would be deleted directly
144    fs.createNewFile(new Path(oldLogDir, "a"));
145    fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a"));
146
147    // Case 2: 5 Procedure WALs that are old which would be deleted
148    for (int i = 1; i < 6; i++) {
149      Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i));
150      fs.createNewFile(fileName);
151    }
152
153    // Sleep for sometime to get old procedure WALs
154    Thread.sleep(ttlProcedureWAL - ttlWAL);
155
156    // Case 3: old WALs which would be deletable
157    for (int i = 1; i < 31; i++) {
158      Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i));
159      fs.createNewFile(fileName);
160      // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
161      // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
162      if (i % (30 / 3) == 1) {
163        repQueues.addLog(fakeMachineName, fileName.getName());
164        LOG.info("Replication log file: " + fileName);
165      }
166    }
167
168    // Case 5: 5 Procedure WALs that are new, will stay
169    for (int i = 6; i < 11; i++) {
170      Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i));
171      fs.createNewFile(fileName);
172    }
173
174    // Sleep for sometime to get newer modification time
175    Thread.sleep(ttlWAL);
176    fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now));
177
178    // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner,
179    // so we are not going down the chain
180    fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + ttlWAL)));
181
182    for (FileStatus stat : fs.listStatus(oldLogDir)) {
183      LOG.info(stat.getPath().toString());
184    }
185
186    // There should be 34 files and masterProcedureWALs directory
187    assertEquals(35, fs.listStatus(oldLogDir).length);
188    // 10 procedure WALs
189    assertEquals(10, fs.listStatus(oldProcedureWALDir).length);
190
191    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir);
192    cleaner.chore();
193
194    // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
195    // are scheduled for replication and masterProcedureWALs directory
196    TEST_UTIL.waitFor(1000,
197        (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(oldLogDir).length);
198    // In masterProcedureWALs we end up with 5 newer Procedure WALs
199    TEST_UTIL.waitFor(1000,
200        (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(oldProcedureWALDir).length);
201
202    for (FileStatus file : fs.listStatus(oldLogDir)) {
203      LOG.debug("Kept log file in oldWALs: " + file.getPath().getName());
204    }
205    for (FileStatus file : fs.listStatus(oldProcedureWALDir)) {
206      LOG.debug("Kept log file in masterProcedureWALs: " + file.getPath().getName());
207    }
208  }
209
210  @Test
211  public void testZnodeCversionChange() throws Exception {
212    Configuration conf = TEST_UTIL.getConfiguration();
213    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
214    cleaner.setConf(conf);
215
216    ReplicationQueuesClientZKImpl rqcMock = Mockito.mock(ReplicationQueuesClientZKImpl.class);
217    Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
218
219    Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
220    rqc.setAccessible(true);
221
222    rqc.set(cleaner, rqcMock);
223
224    // This should return eventually when cversion stabilizes
225    cleaner.getDeletableFiles(new LinkedList<>());
226  }
227
228  @Test(timeout=10000)
229  public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception {
230    Configuration conf = TEST_UTIL.getConfiguration();
231
232    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
233
234    List<FileStatus> dummyFiles = Lists.newArrayList(
235        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
236        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
237    );
238
239    FaultyZooKeeperWatcher faultyZK =
240        new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
241    final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
242
243    try {
244      faultyZK.init();
245      ReplicationQueuesClient replicationQueuesClient = spy(ReplicationFactory.getReplicationQueuesClient(
246        new ReplicationQueuesClientArguments(conf, new ReplicationLogCleaner.WarnOnlyAbortable(), faultyZK)));
247      doAnswer(new Answer<Object>() {
248        @Override
249        public Object answer(InvocationOnMock invocation) throws Throwable {
250          try {
251            return invocation.callRealMethod();
252          } catch (KeeperException.ConnectionLossException e) {
253            getListOfReplicatorsFailed.set(true);
254            throw e;
255          }
256        }
257      }).when(replicationQueuesClient).getListOfReplicators();
258      replicationQueuesClient.init();
259
260      cleaner.setConf(conf, faultyZK, replicationQueuesClient);
261      // should keep all files due to a ConnectionLossException getting the queues znodes
262      cleaner.preClean();
263      Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
264
265      assertTrue(getListOfReplicatorsFailed.get());
266      assertFalse(toDelete.iterator().hasNext());
267      assertFalse(cleaner.isStopped());
268    } finally {
269      faultyZK.close();
270    }
271  }
272
273  /**
274   * When zk is working both files should be returned
275   * @throws Exception
276   */
277  @Test(timeout=10000)
278  public void testZooKeeperNormal() throws Exception {
279    Configuration conf = TEST_UTIL.getConfiguration();
280    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
281
282    List<FileStatus> dummyFiles = Lists.newArrayList(
283        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
284        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
285    );
286    
287    ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null);
288    try {
289      cleaner.setConf(conf, zkw);
290      cleaner.preClean();
291      Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
292      Iterator<FileStatus> iter = filesToDelete.iterator();
293      assertTrue(iter.hasNext());
294      assertEquals(new Path("log1"), iter.next().getPath());
295      assertTrue(iter.hasNext());
296      assertEquals(new Path("log2"), iter.next().getPath());
297      assertFalse(iter.hasNext());
298    } finally {
299      zkw.close();
300    }
301  }
302
303  @Test
304  public void testOnConfigurationChange() throws Exception {
305    Configuration conf = TEST_UTIL.getConfiguration();
306    conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE,
307        LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
308    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
309        LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
310    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
311        LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
312    // Prepare environments
313    Server server = new DummyServer();
314    Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
315        HConstants.HREGION_OLDLOGDIR_NAME);
316    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
317    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir);
318    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners());
319    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
320        cleaner.getCleanerThreadTimeoutMsec());
321    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
322        cleaner.getCleanerThreadCheckIntervalMsec());
323    // Create dir and files for test
324    fs.delete(oldWALsDir, true);
325    fs.mkdirs(oldWALsDir);
326    int numOfFiles = 10;
327    createFiles(fs, oldWALsDir, numOfFiles);
328    FileStatus[] status = fs.listStatus(oldWALsDir);
329    assertEquals(numOfFiles, status.length);
330    // Start cleaner chore
331    Thread thread = new Thread(() -> cleaner.chore());
332    thread.setDaemon(true);
333    thread.start();
334    // change size of cleaners dynamically
335    int sizeToChange = 4;
336    long threadTimeoutToChange = 30 * 1000L;
337    long threadCheckIntervalToChange = 250L;
338    conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange);
339    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange);
340    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
341        threadCheckIntervalToChange);
342    cleaner.onConfigurationChange(conf);
343    assertEquals(sizeToChange, cleaner.getSizeOfCleaners());
344    assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec());
345    assertEquals(threadCheckIntervalToChange, cleaner.getCleanerThreadCheckIntervalMsec());
346    // Stop chore
347    thread.join();
348    status = fs.listStatus(oldWALsDir);
349    assertEquals(0, status.length);
350  }
351
352  private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
353    Random random = new Random();
354    for (int i = 0; i < numOfFiles; i++) {
355      int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M
356      try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
357        for (int m = 0; m < xMega; m++) {
358          byte[] M = new byte[1024 * 1024];
359          random.nextBytes(M);
360          fsdos.write(M);
361        }
362      }
363    }
364  }
365
366  static class DummyServer implements Server {
367
368    @Override
369    public Configuration getConfiguration() {
370      return TEST_UTIL.getConfiguration();
371    }
372
373    @Override
374    public ZKWatcher getZooKeeper() {
375      try {
376        return new ZKWatcher(getConfiguration(), "dummy server", this);
377      } catch (IOException e) {
378        e.printStackTrace();
379      }
380      return null;
381    }
382
383    @Override
384    public CoordinatedStateManager getCoordinatedStateManager() {
385      return null;
386    }
387
388    @Override
389    public ClusterConnection getConnection() {
390      return null;
391    }
392
393    @Override
394    public MetaTableLocator getMetaTableLocator() {
395      return null;
396    }
397
398    @Override
399    public ServerName getServerName() {
400      return ServerName.valueOf("regionserver,60020,000000");
401    }
402
403    @Override
404    public void abort(String why, Throwable e) {}
405
406    @Override
407    public boolean isAborted() {
408      return false;
409    }
410
411    @Override
412    public void stop(String why) {}
413
414    @Override
415    public boolean isStopped() {
416      return false;
417    }
418
419    @Override
420    public ChoreService getChoreService() {
421      return null;
422    }
423
424    @Override
425    public ClusterConnection getClusterConnection() {
426      return null;
427    }
428
429    @Override
430    public FileSystem getFileSystem() {
431      return null;
432    }
433
434    @Override
435    public boolean isStopping() {
436      return false;
437    }
438
439    @Override
440    public Connection createConnection(Configuration conf) throws IOException {
441      return null;
442    }
443  }
444
445  static class FaultyZooKeeperWatcher extends ZKWatcher {
446    private RecoverableZooKeeper zk;
447
448    public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
449        throws ZooKeeperConnectionException, IOException {
450      super(conf, identifier, abortable);
451    }
452
453    public void init() throws Exception {
454      this.zk = spy(super.getRecoverableZooKeeper());
455      doThrow(new KeeperException.ConnectionLossException())
456        .when(zk).getChildren("/hbase/replication/rs", null);
457    }
458
459    @Override
460    public RecoverableZooKeeper getRecoverableZooKeeper() {
461      return zk;
462    }
463  }
464}