001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.when;
023
024import java.io.IOException;
025import java.net.URLEncoder;
026import java.nio.charset.StandardCharsets;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.concurrent.ThreadLocalRandom;
031import org.apache.commons.io.FileUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataOutputStream;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.Server;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.TableNameTestRule;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.master.HMaster;
046import org.apache.hadoop.hbase.master.MasterServices;
047import org.apache.hadoop.hbase.master.ServerManager;
048import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
049import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
050import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
051import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
052import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
053import org.apache.hadoop.hbase.replication.ReplicationQueueId;
054import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
055import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
056import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
057import org.apache.hadoop.hbase.testclassification.MasterTests;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.MockServer;
062import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
063import org.junit.AfterClass;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
074
075@Category({ MasterTests.class, MediumTests.class })
076public class TestLogsCleaner {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestLogsCleaner.class);
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class);
083  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
084
085  private static final Path OLD_WALS_DIR =
086    new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
087
088  private static final Path OLD_PROCEDURE_WALS_DIR = new Path(OLD_WALS_DIR, "masterProcedureWALs");
089
090  private static Configuration conf;
091
092  private static DirScanPool POOL;
093
094  private static String peerId = "1";
095
096  private MasterServices masterServices;
097
098  private ReplicationQueueStorage queueStorage;
099
100  @Rule
101  public final TableNameTestRule tableNameRule = new TableNameTestRule();
102
103  @BeforeClass
104  public static void setUpBeforeClass() throws Exception {
105    TEST_UTIL.startMiniCluster();
106    POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
107  }
108
109  @AfterClass
110  public static void tearDownAfterClass() throws Exception {
111    TEST_UTIL.shutdownMiniCluster();
112    POOL.shutdownNow();
113  }
114
115  @Before
116  public void beforeTest() throws Exception {
117    conf = TEST_UTIL.getConfiguration();
118
119    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
120
121    fs.delete(OLD_WALS_DIR, true);
122
123    // root directory
124    fs.mkdirs(OLD_WALS_DIR);
125
126    TableName tableName = tableNameRule.getTableName();
127    TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
128    TEST_UTIL.getAdmin().createTable(td);
129    TEST_UTIL.waitTableAvailable(tableName);
130    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(),
131      conf, tableName);
132
133    masterServices = mock(MasterServices.class);
134    when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
135    when(masterServices.getReplicationLogCleanerBarrier())
136      .thenReturn(new ReplicationLogCleanerBarrier());
137    ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
138    when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
139    when(rpm.getQueueStorage()).thenReturn(queueStorage);
140    when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
141    ServerManager sm = mock(ServerManager.class);
142    when(masterServices.getServerManager()).thenReturn(sm);
143    when(sm.getOnlineServersList()).thenReturn(Collections.emptyList());
144    @SuppressWarnings("unchecked")
145    ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
146    when(masterServices.getMasterProcedureExecutor()).thenReturn(procExec);
147    when(procExec.getProcedures()).thenReturn(Collections.emptyList());
148  }
149
150  /**
151   * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same
152   * oldWALs directory.
153   * <p/>
154   * Created files:
155   * <ul>
156   * <li>2 invalid files</li>
157   * <li>5 old Procedure WALs</li>
158   * <li>30 old WALs from which 3 are in replication</li>
159   * <li>5 recent Procedure WALs</li>
160   * <li>1 recent WAL</li>
161   * <li>1 very new WAL (timestamp in future)</li>
162   * <li>masterProcedureWALs subdirectory</li>
163   * </ul>
164   * Files which should stay:
165   * <ul>
166   * <li>3 replication WALs</li>
167   * <li>2 new WALs</li>
168   * <li>5 latest Procedure WALs</li>
169   * <li>masterProcedureWALs subdirectory</li>
170   * </ul>
171   */
172  @Test
173  public void testLogCleaning() throws Exception {
174    // set TTLs
175    long ttlWAL = 2000;
176    long ttlProcedureWAL = 4000;
177    conf.setLong("hbase.master.logcleaner.ttl", ttlWAL);
178    conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL);
179
180    HMaster.decorateMasterConfiguration(conf);
181    Server server = new DummyServer();
182    String fakeMachineName =
183      URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name());
184
185    final FileSystem fs = FileSystem.get(conf);
186    fs.mkdirs(OLD_PROCEDURE_WALS_DIR);
187
188    final long now = EnvironmentEdgeManager.currentTime();
189
190    // Case 1: 2 invalid files, which would be deleted directly
191    fs.createNewFile(new Path(OLD_WALS_DIR, "a"));
192    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a"));
193
194    // Case 2: 5 Procedure WALs that are old which would be deleted
195    for (int i = 1; i <= 5; i++) {
196      final Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
197      fs.createNewFile(fileName);
198    }
199
200    // Sleep for sometime to get old procedure WALs
201    Thread.sleep(ttlProcedureWAL - ttlWAL);
202
203    // Case 3: old WALs which would be deletable
204    for (int i = 1; i <= 30; i++) {
205      Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
206      fs.createNewFile(fileName);
207    }
208    // Case 4: the newest 3 WALs will be kept because they are beyond the replication offset
209    masterServices.getReplicationPeerManager().listPeers(null)
210      .add(new ReplicationPeerDescription(peerId, true, null, null));
211    queueStorage.setOffset(new ReplicationQueueId(server.getServerName(), peerId), fakeMachineName,
212      new ReplicationGroupOffset(fakeMachineName + "." + (now - 3), 0), Collections.emptyMap());
213    // Case 5: 5 Procedure WALs that are new, will stay
214    for (int i = 6; i <= 10; i++) {
215      Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
216      fs.createNewFile(fileName);
217    }
218
219    // Sleep for sometime to get newer modification time
220    Thread.sleep(ttlWAL);
221    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now));
222
223    // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner,
224    // so we are not going down the chain
225    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL)));
226
227    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
228    LOG.info("File status: {}", Arrays.toString(status));
229
230    // There should be 34 files and 1 masterProcedureWALs directory
231    assertEquals(35, fs.listStatus(OLD_WALS_DIR).length);
232    // 10 procedure WALs
233    assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
234
235    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL,
236      ImmutableMap.of(HMaster.MASTER, masterServices));
237    cleaner.chore();
238
239    // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
240    // are scheduled for replication and masterProcedureWALs directory
241    TEST_UTIL.waitFor(1000,
242      (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(OLD_WALS_DIR).length);
243    // In masterProcedureWALs we end up with 5 newer Procedure WALs
244    TEST_UTIL.waitFor(1000,
245      (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
246
247    if (LOG.isDebugEnabled()) {
248      FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR);
249      FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR);
250      LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs));
251      LOG.debug("Kept log file for masterProcedureWALs: {}", Arrays.toString(statusProcedureWALs));
252    }
253  }
254
255  @Test
256  public void testOnConfigurationChange() throws Exception {
257    // Prepare environments
258    Server server = new DummyServer();
259
260    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
261    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL,
262      ImmutableMap.of(HMaster.MASTER, masterServices));
263    int size = cleaner.getSizeOfCleaners();
264    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
265      cleaner.getCleanerThreadTimeoutMsec());
266    // Create dir and files for test
267    int numOfFiles = 10;
268    createFiles(fs, OLD_WALS_DIR, numOfFiles);
269    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
270    assertEquals(numOfFiles, status.length);
271    // Start cleaner chore
272    Thread thread = new Thread(() -> cleaner.chore());
273    thread.setDaemon(true);
274    thread.start();
275    // change size of cleaners dynamically
276    int sizeToChange = 4;
277    long threadTimeoutToChange = 30 * 1000L;
278    conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange);
279    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange);
280    cleaner.onConfigurationChange(conf);
281    assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners());
282    assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec());
283    // Stop chore
284    thread.join();
285    status = fs.listStatus(OLD_WALS_DIR);
286    assertEquals(0, status.length);
287  }
288
289  private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
290    for (int i = 0; i < numOfFiles; i++) {
291      // size of each file is 1M, 2M, or 3M
292      int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4);
293      byte[] M = new byte[Math.toIntExact(FileUtils.ONE_MB * xMega)];
294      Bytes.random(M);
295      try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
296        fsdos.write(M);
297      }
298    }
299  }
300
301  private static final class DummyServer extends MockServer {
302
303    @Override
304    public Configuration getConfiguration() {
305      return TEST_UTIL.getConfiguration();
306    }
307
308    @Override
309    public ZKWatcher getZooKeeper() {
310      try {
311        return new ZKWatcher(getConfiguration(), "dummy server", this);
312      } catch (IOException e) {
313        e.printStackTrace();
314      }
315      return null;
316    }
317  }
318}