001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.when;
023
024import java.io.IOException;
025import java.net.URLEncoder;
026import java.nio.charset.StandardCharsets;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.concurrent.ThreadLocalRandom;
031import org.apache.commons.io.FileUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataOutputStream;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.Server;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.AsyncClusterConnection;
043import org.apache.hadoop.hbase.client.TableDescriptor;
044import org.apache.hadoop.hbase.master.HMaster;
045import org.apache.hadoop.hbase.master.MasterServices;
046import org.apache.hadoop.hbase.master.ServerManager;
047import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
048import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
049import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
050import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
051import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
052import org.apache.hadoop.hbase.replication.ReplicationQueueId;
053import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
054import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
055import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
056import org.apache.hadoop.hbase.testclassification.MasterTests;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.MockServer;
061import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
062import org.junit.jupiter.api.AfterAll;
063import org.junit.jupiter.api.BeforeAll;
064import org.junit.jupiter.api.BeforeEach;
065import org.junit.jupiter.api.Tag;
066import org.junit.jupiter.api.Test;
067import org.junit.jupiter.api.TestInfo;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
072
073@Tag(MasterTests.TAG)
074@Tag(MediumTests.TAG)
075public class TestLogsCleaner {
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class);
078  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
079
080  private static final Path OLD_WALS_DIR =
081    new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
082
083  private static final Path OLD_PROCEDURE_WALS_DIR = new Path(OLD_WALS_DIR, "masterProcedureWALs");
084
085  private static Configuration conf;
086
087  private static DirScanPool POOL;
088
089  private static String peerId = "1";
090
091  private MasterServices masterServices;
092
093  private ReplicationQueueStorage queueStorage;
094
095  @BeforeAll
096  public static void setUpBeforeClass() throws Exception {
097    TEST_UTIL.startMiniCluster();
098    POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
099  }
100
101  @AfterAll
102  public static void tearDownAfterClass() throws Exception {
103    TEST_UTIL.shutdownMiniCluster();
104    POOL.shutdownNow();
105  }
106
107  @BeforeEach
108  public void beforeTest(TestInfo testInfo) throws Exception {
109    conf = TEST_UTIL.getConfiguration();
110
111    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
112
113    fs.delete(OLD_WALS_DIR, true);
114
115    // root directory
116    fs.mkdirs(OLD_WALS_DIR);
117
118    TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
119    TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
120    TEST_UTIL.getAdmin().createTable(td);
121    TEST_UTIL.waitTableAvailable(tableName);
122    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(),
123      conf, tableName);
124
125    masterServices = mock(MasterServices.class);
126    when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
127    when(masterServices.getReplicationLogCleanerBarrier())
128      .thenReturn(new ReplicationLogCleanerBarrier());
129    AsyncClusterConnection asyncClusterConnection = mock(AsyncClusterConnection.class);
130    when(masterServices.getAsyncClusterConnection()).thenReturn(asyncClusterConnection);
131    when(asyncClusterConnection.isClosed()).thenReturn(false);
132    ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
133    when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
134    when(rpm.getQueueStorage()).thenReturn(queueStorage);
135    when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
136    ServerManager sm = mock(ServerManager.class);
137    when(masterServices.getServerManager()).thenReturn(sm);
138    when(sm.getOnlineServersList()).thenReturn(Collections.emptyList());
139    @SuppressWarnings("unchecked")
140    ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
141    when(masterServices.getMasterProcedureExecutor()).thenReturn(procExec);
142    when(procExec.getProcedures()).thenReturn(Collections.emptyList());
143  }
144
145  /**
146   * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same
147   * oldWALs directory.
148   * <p/>
149   * Created files:
150   * <ul>
151   * <li>2 invalid files</li>
152   * <li>5 old Procedure WALs</li>
153   * <li>30 old WALs from which 3 are in replication</li>
154   * <li>5 recent Procedure WALs</li>
155   * <li>1 recent WAL</li>
156   * <li>1 very new WAL (timestamp in future)</li>
157   * <li>masterProcedureWALs subdirectory</li>
158   * </ul>
159   * Files which should stay:
160   * <ul>
161   * <li>3 replication WALs</li>
162   * <li>2 new WALs</li>
163   * <li>5 latest Procedure WALs</li>
164   * <li>masterProcedureWALs subdirectory</li>
165   * </ul>
166   */
167  @Test
168  public void testLogCleaning() throws Exception {
169    // set TTLs
170    long ttlWAL = 2000;
171    long ttlProcedureWAL = 4000;
172    conf.setLong("hbase.master.logcleaner.ttl", ttlWAL);
173    conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL);
174
175    HMaster.decorateMasterConfiguration(conf);
176    Server server = new DummyServer();
177    String fakeMachineName =
178      URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name());
179
180    final FileSystem fs = FileSystem.get(conf);
181    fs.mkdirs(OLD_PROCEDURE_WALS_DIR);
182
183    final long now = EnvironmentEdgeManager.currentTime();
184
185    // Case 1: 2 invalid files, which would be deleted directly
186    fs.createNewFile(new Path(OLD_WALS_DIR, "a"));
187    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a"));
188
189    // Case 2: 5 Procedure WALs that are old which would be deleted
190    for (int i = 1; i <= 5; i++) {
191      final Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
192      fs.createNewFile(fileName);
193    }
194
195    // Sleep for sometime to get old procedure WALs
196    Thread.sleep(ttlProcedureWAL - ttlWAL);
197
198    // Case 3: old WALs which would be deletable
199    for (int i = 1; i <= 30; i++) {
200      Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
201      fs.createNewFile(fileName);
202    }
203    // Case 4: the newest 3 WALs will be kept because they are beyond the replication offset
204    masterServices.getReplicationPeerManager().listPeers(null)
205      .add(new ReplicationPeerDescription(peerId, true, null, null));
206    queueStorage.setOffset(new ReplicationQueueId(server.getServerName(), peerId), fakeMachineName,
207      new ReplicationGroupOffset(fakeMachineName + "." + (now - 3), 0), Collections.emptyMap());
208    // Case 5: 5 Procedure WALs that are new, will stay
209    for (int i = 6; i <= 10; i++) {
210      Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
211      fs.createNewFile(fileName);
212    }
213
214    // Sleep for sometime to get newer modification time
215    Thread.sleep(ttlWAL);
216    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now));
217
218    // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner,
219    // so we are not going down the chain
220    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL)));
221
222    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
223    LOG.info("File status: {}", Arrays.toString(status));
224
225    // There should be 34 files and 1 masterProcedureWALs directory
226    assertEquals(35, fs.listStatus(OLD_WALS_DIR).length);
227    // 10 procedure WALs
228    assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
229
230    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL,
231      ImmutableMap.of(HMaster.MASTER, masterServices));
232    cleaner.chore();
233
234    // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
235    // are scheduled for replication and masterProcedureWALs directory
236    TEST_UTIL.waitFor(1000,
237      (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(OLD_WALS_DIR).length);
238    // In masterProcedureWALs we end up with 5 newer Procedure WALs
239    TEST_UTIL.waitFor(1000,
240      (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
241
242    if (LOG.isDebugEnabled()) {
243      FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR);
244      FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR);
245      LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs));
246      LOG.debug("Kept log file for masterProcedureWALs: {}", Arrays.toString(statusProcedureWALs));
247    }
248  }
249
250  @Test
251  public void testOnConfigurationChange() throws Exception {
252    // Prepare environments
253    Server server = new DummyServer();
254
255    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
256    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL,
257      ImmutableMap.of(HMaster.MASTER, masterServices));
258    int size = cleaner.getSizeOfCleaners();
259    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
260      cleaner.getCleanerThreadTimeoutMsec());
261    // Create dir and files for test
262    int numOfFiles = 10;
263    createFiles(fs, OLD_WALS_DIR, numOfFiles);
264    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
265    assertEquals(numOfFiles, status.length);
266    // Start cleaner chore
267    Thread thread = new Thread(() -> cleaner.chore());
268    thread.setDaemon(true);
269    thread.start();
270    // change size of cleaners dynamically
271    int sizeToChange = 4;
272    long threadTimeoutToChange = 30 * 1000L;
273    conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange);
274    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange);
275    cleaner.onConfigurationChange(conf);
276    assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners());
277    assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec());
278    // Stop chore
279    thread.join();
280    status = fs.listStatus(OLD_WALS_DIR);
281    assertEquals(0, status.length);
282  }
283
284  private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
285    for (int i = 0; i < numOfFiles; i++) {
286      // size of each file is 1M, 2M, or 3M
287      int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4);
288      byte[] M = new byte[Math.toIntExact(FileUtils.ONE_MB * xMega)];
289      Bytes.random(M);
290      try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
291        fsdos.write(M);
292      }
293    }
294  }
295
296  private static final class DummyServer extends MockServer {
297
298    @Override
299    public Configuration getConfiguration() {
300      return TEST_UTIL.getConfiguration();
301    }
302
303    @Override
304    public ZKWatcher getZooKeeper() {
305      try {
306        return new ZKWatcher(getConfiguration(), "dummy server", this);
307      } catch (IOException e) {
308        e.printStackTrace();
309      }
310      return null;
311    }
312  }
313}