001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.when;
023
024import java.io.IOException;
025import java.net.URLEncoder;
026import java.nio.charset.StandardCharsets;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.concurrent.ThreadLocalRandom;
031import org.apache.commons.io.FileUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataOutputStream;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.Server;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.TableNameTestRule;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.client.AsyncClusterConnection;
045import org.apache.hadoop.hbase.client.TableDescriptor;
046import org.apache.hadoop.hbase.master.HMaster;
047import org.apache.hadoop.hbase.master.MasterServices;
048import org.apache.hadoop.hbase.master.ServerManager;
049import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
050import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
051import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
052import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
053import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
054import org.apache.hadoop.hbase.replication.ReplicationQueueId;
055import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
056import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
057import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
058import org.apache.hadoop.hbase.testclassification.MasterTests;
059import org.apache.hadoop.hbase.testclassification.MediumTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.MockServer;
063import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
064import org.junit.AfterClass;
065import org.junit.Before;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
075
076@Category({ MasterTests.class, MediumTests.class })
077public class TestLogsCleaner {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestLogsCleaner.class);
082
083  private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class);
084  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
085
086  private static final Path OLD_WALS_DIR =
087    new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
088
089  private static final Path OLD_PROCEDURE_WALS_DIR = new Path(OLD_WALS_DIR, "masterProcedureWALs");
090
091  private static Configuration conf;
092
093  private static DirScanPool POOL;
094
095  private static String peerId = "1";
096
097  private MasterServices masterServices;
098
099  private ReplicationQueueStorage queueStorage;
100
101  @Rule
102  public final TableNameTestRule tableNameRule = new TableNameTestRule();
103
104  @BeforeClass
105  public static void setUpBeforeClass() throws Exception {
106    TEST_UTIL.startMiniCluster();
107    POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
108  }
109
110  @AfterClass
111  public static void tearDownAfterClass() throws Exception {
112    TEST_UTIL.shutdownMiniCluster();
113    POOL.shutdownNow();
114  }
115
116  @Before
117  public void beforeTest() throws Exception {
118    conf = TEST_UTIL.getConfiguration();
119
120    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
121
122    fs.delete(OLD_WALS_DIR, true);
123
124    // root directory
125    fs.mkdirs(OLD_WALS_DIR);
126
127    TableName tableName = tableNameRule.getTableName();
128    TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
129    TEST_UTIL.getAdmin().createTable(td);
130    TEST_UTIL.waitTableAvailable(tableName);
131    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(),
132      conf, tableName);
133
134    masterServices = mock(MasterServices.class);
135    when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
136    when(masterServices.getReplicationLogCleanerBarrier())
137      .thenReturn(new ReplicationLogCleanerBarrier());
138    AsyncClusterConnection asyncClusterConnection = mock(AsyncClusterConnection.class);
139    when(masterServices.getAsyncClusterConnection()).thenReturn(asyncClusterConnection);
140    when(asyncClusterConnection.isClosed()).thenReturn(false);
141    ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
142    when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
143    when(rpm.getQueueStorage()).thenReturn(queueStorage);
144    when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
145    ServerManager sm = mock(ServerManager.class);
146    when(masterServices.getServerManager()).thenReturn(sm);
147    when(sm.getOnlineServersList()).thenReturn(Collections.emptyList());
148    @SuppressWarnings("unchecked")
149    ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
150    when(masterServices.getMasterProcedureExecutor()).thenReturn(procExec);
151    when(procExec.getProcedures()).thenReturn(Collections.emptyList());
152  }
153
154  /**
155   * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same
156   * oldWALs directory.
157   * <p/>
158   * Created files:
159   * <ul>
160   * <li>2 invalid files</li>
161   * <li>5 old Procedure WALs</li>
162   * <li>30 old WALs from which 3 are in replication</li>
163   * <li>5 recent Procedure WALs</li>
164   * <li>1 recent WAL</li>
165   * <li>1 very new WAL (timestamp in future)</li>
166   * <li>masterProcedureWALs subdirectory</li>
167   * </ul>
168   * Files which should stay:
169   * <ul>
170   * <li>3 replication WALs</li>
171   * <li>2 new WALs</li>
172   * <li>5 latest Procedure WALs</li>
173   * <li>masterProcedureWALs subdirectory</li>
174   * </ul>
175   */
176  @Test
177  public void testLogCleaning() throws Exception {
178    // set TTLs
179    long ttlWAL = 2000;
180    long ttlProcedureWAL = 4000;
181    conf.setLong("hbase.master.logcleaner.ttl", ttlWAL);
182    conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL);
183
184    HMaster.decorateMasterConfiguration(conf);
185    Server server = new DummyServer();
186    String fakeMachineName =
187      URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name());
188
189    final FileSystem fs = FileSystem.get(conf);
190    fs.mkdirs(OLD_PROCEDURE_WALS_DIR);
191
192    final long now = EnvironmentEdgeManager.currentTime();
193
194    // Case 1: 2 invalid files, which would be deleted directly
195    fs.createNewFile(new Path(OLD_WALS_DIR, "a"));
196    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a"));
197
198    // Case 2: 5 Procedure WALs that are old which would be deleted
199    for (int i = 1; i <= 5; i++) {
200      final Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
201      fs.createNewFile(fileName);
202    }
203
204    // Sleep for sometime to get old procedure WALs
205    Thread.sleep(ttlProcedureWAL - ttlWAL);
206
207    // Case 3: old WALs which would be deletable
208    for (int i = 1; i <= 30; i++) {
209      Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
210      fs.createNewFile(fileName);
211    }
212    // Case 4: the newest 3 WALs will be kept because they are beyond the replication offset
213    masterServices.getReplicationPeerManager().listPeers(null)
214      .add(new ReplicationPeerDescription(peerId, true, null, null));
215    queueStorage.setOffset(new ReplicationQueueId(server.getServerName(), peerId), fakeMachineName,
216      new ReplicationGroupOffset(fakeMachineName + "." + (now - 3), 0), Collections.emptyMap());
217    // Case 5: 5 Procedure WALs that are new, will stay
218    for (int i = 6; i <= 10; i++) {
219      Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
220      fs.createNewFile(fileName);
221    }
222
223    // Sleep for sometime to get newer modification time
224    Thread.sleep(ttlWAL);
225    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now));
226
227    // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner,
228    // so we are not going down the chain
229    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL)));
230
231    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
232    LOG.info("File status: {}", Arrays.toString(status));
233
234    // There should be 34 files and 1 masterProcedureWALs directory
235    assertEquals(35, fs.listStatus(OLD_WALS_DIR).length);
236    // 10 procedure WALs
237    assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
238
239    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL,
240      ImmutableMap.of(HMaster.MASTER, masterServices));
241    cleaner.chore();
242
243    // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
244    // are scheduled for replication and masterProcedureWALs directory
245    TEST_UTIL.waitFor(1000,
246      (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(OLD_WALS_DIR).length);
247    // In masterProcedureWALs we end up with 5 newer Procedure WALs
248    TEST_UTIL.waitFor(1000,
249      (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
250
251    if (LOG.isDebugEnabled()) {
252      FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR);
253      FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR);
254      LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs));
255      LOG.debug("Kept log file for masterProcedureWALs: {}", Arrays.toString(statusProcedureWALs));
256    }
257  }
258
259  @Test
260  public void testOnConfigurationChange() throws Exception {
261    // Prepare environments
262    Server server = new DummyServer();
263
264    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
265    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL,
266      ImmutableMap.of(HMaster.MASTER, masterServices));
267    int size = cleaner.getSizeOfCleaners();
268    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
269      cleaner.getCleanerThreadTimeoutMsec());
270    // Create dir and files for test
271    int numOfFiles = 10;
272    createFiles(fs, OLD_WALS_DIR, numOfFiles);
273    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
274    assertEquals(numOfFiles, status.length);
275    // Start cleaner chore
276    Thread thread = new Thread(() -> cleaner.chore());
277    thread.setDaemon(true);
278    thread.start();
279    // change size of cleaners dynamically
280    int sizeToChange = 4;
281    long threadTimeoutToChange = 30 * 1000L;
282    conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange);
283    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange);
284    cleaner.onConfigurationChange(conf);
285    assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners());
286    assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec());
287    // Stop chore
288    thread.join();
289    status = fs.listStatus(OLD_WALS_DIR);
290    assertEquals(0, status.length);
291  }
292
293  private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
294    for (int i = 0; i < numOfFiles; i++) {
295      // size of each file is 1M, 2M, or 3M
296      int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4);
297      byte[] M = new byte[Math.toIntExact(FileUtils.ONE_MB * xMega)];
298      Bytes.random(M);
299      try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
300        fsdos.write(M);
301      }
302    }
303  }
304
305  private static final class DummyServer extends MockServer {
306
307    @Override
308    public Configuration getConfiguration() {
309      return TEST_UTIL.getConfiguration();
310    }
311
312    @Override
313    public ZKWatcher getZooKeeper() {
314      try {
315        return new ZKWatcher(getConfiguration(), "dummy server", this);
316      } catch (IOException e) {
317        e.printStackTrace();
318      }
319      return null;
320    }
321  }
322}