001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.when; 023 024import java.io.IOException; 025import java.net.URLEncoder; 026import java.nio.charset.StandardCharsets; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.concurrent.ThreadLocalRandom; 031import org.apache.commons.io.FileUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataOutputStream; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.Server; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.Waiter; 042import org.apache.hadoop.hbase.client.AsyncClusterConnection; 043import org.apache.hadoop.hbase.client.TableDescriptor; 044import org.apache.hadoop.hbase.master.HMaster; 045import org.apache.hadoop.hbase.master.MasterServices; 046import org.apache.hadoop.hbase.master.ServerManager; 047import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 048import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 049import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 050import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 051import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 052import org.apache.hadoop.hbase.replication.ReplicationQueueId; 053import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 054import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 055import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; 056import org.apache.hadoop.hbase.testclassification.MasterTests; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.MockServer; 061import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 062import org.junit.jupiter.api.AfterAll; 063import org.junit.jupiter.api.BeforeAll; 064import org.junit.jupiter.api.BeforeEach; 065import org.junit.jupiter.api.Tag; 066import org.junit.jupiter.api.Test; 067import org.junit.jupiter.api.TestInfo; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 072 073@Tag(MasterTests.TAG) 074@Tag(MediumTests.TAG) 075public class TestLogsCleaner { 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class); 078 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 079 080 private static final Path OLD_WALS_DIR = 081 new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); 082 083 private static final Path OLD_PROCEDURE_WALS_DIR = new Path(OLD_WALS_DIR, "masterProcedureWALs"); 084 085 private static Configuration conf; 086 087 private static DirScanPool POOL; 088 089 private static String peerId = "1"; 090 091 private MasterServices masterServices; 092 093 private ReplicationQueueStorage queueStorage; 094 095 @BeforeAll 096 public static void setUpBeforeClass() throws Exception { 097 TEST_UTIL.startMiniCluster(); 098 POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration()); 099 } 100 101 @AfterAll 102 public static void tearDownAfterClass() throws Exception { 103 TEST_UTIL.shutdownMiniCluster(); 104 POOL.shutdownNow(); 105 } 106 107 @BeforeEach 108 public void beforeTest(TestInfo testInfo) throws Exception { 109 conf = TEST_UTIL.getConfiguration(); 110 111 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 112 113 fs.delete(OLD_WALS_DIR, true); 114 115 // root directory 116 fs.mkdirs(OLD_WALS_DIR); 117 118 TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 119 TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); 120 TEST_UTIL.getAdmin().createTable(td); 121 TEST_UTIL.waitTableAvailable(tableName); 122 queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), 123 conf, tableName); 124 125 masterServices = mock(MasterServices.class); 126 when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection()); 127 when(masterServices.getReplicationLogCleanerBarrier()) 128 .thenReturn(new ReplicationLogCleanerBarrier()); 129 AsyncClusterConnection asyncClusterConnection = mock(AsyncClusterConnection.class); 130 when(masterServices.getAsyncClusterConnection()).thenReturn(asyncClusterConnection); 131 when(asyncClusterConnection.isClosed()).thenReturn(false); 132 ReplicationPeerManager rpm = mock(ReplicationPeerManager.class); 133 when(masterServices.getReplicationPeerManager()).thenReturn(rpm); 134 when(rpm.getQueueStorage()).thenReturn(queueStorage); 135 when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); 136 ServerManager sm = mock(ServerManager.class); 137 when(masterServices.getServerManager()).thenReturn(sm); 138 when(sm.getOnlineServersList()).thenReturn(Collections.emptyList()); 139 @SuppressWarnings("unchecked") 140 ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class); 141 when(masterServices.getMasterProcedureExecutor()).thenReturn(procExec); 142 when(procExec.getProcedures()).thenReturn(Collections.emptyList()); 143 } 144 145 /** 146 * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same 147 * oldWALs directory. 148 * <p/> 149 * Created files: 150 * <ul> 151 * <li>2 invalid files</li> 152 * <li>5 old Procedure WALs</li> 153 * <li>30 old WALs from which 3 are in replication</li> 154 * <li>5 recent Procedure WALs</li> 155 * <li>1 recent WAL</li> 156 * <li>1 very new WAL (timestamp in future)</li> 157 * <li>masterProcedureWALs subdirectory</li> 158 * </ul> 159 * Files which should stay: 160 * <ul> 161 * <li>3 replication WALs</li> 162 * <li>2 new WALs</li> 163 * <li>5 latest Procedure WALs</li> 164 * <li>masterProcedureWALs subdirectory</li> 165 * </ul> 166 */ 167 @Test 168 public void testLogCleaning() throws Exception { 169 // set TTLs 170 long ttlWAL = 2000; 171 long ttlProcedureWAL = 4000; 172 conf.setLong("hbase.master.logcleaner.ttl", ttlWAL); 173 conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL); 174 175 HMaster.decorateMasterConfiguration(conf); 176 Server server = new DummyServer(); 177 String fakeMachineName = 178 URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name()); 179 180 final FileSystem fs = FileSystem.get(conf); 181 fs.mkdirs(OLD_PROCEDURE_WALS_DIR); 182 183 final long now = EnvironmentEdgeManager.currentTime(); 184 185 // Case 1: 2 invalid files, which would be deleted directly 186 fs.createNewFile(new Path(OLD_WALS_DIR, "a")); 187 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a")); 188 189 // Case 2: 5 Procedure WALs that are old which would be deleted 190 for (int i = 1; i <= 5; i++) { 191 final Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 192 fs.createNewFile(fileName); 193 } 194 195 // Sleep for sometime to get old procedure WALs 196 Thread.sleep(ttlProcedureWAL - ttlWAL); 197 198 // Case 3: old WALs which would be deletable 199 for (int i = 1; i <= 30; i++) { 200 Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i)); 201 fs.createNewFile(fileName); 202 } 203 // Case 4: the newest 3 WALs will be kept because they are beyond the replication offset 204 masterServices.getReplicationPeerManager().listPeers(null) 205 .add(new ReplicationPeerDescription(peerId, true, null, null)); 206 queueStorage.setOffset(new ReplicationQueueId(server.getServerName(), peerId), fakeMachineName, 207 new ReplicationGroupOffset(fakeMachineName + "." + (now - 3), 0), Collections.emptyMap()); 208 // Case 5: 5 Procedure WALs that are new, will stay 209 for (int i = 6; i <= 10; i++) { 210 Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 211 fs.createNewFile(fileName); 212 } 213 214 // Sleep for sometime to get newer modification time 215 Thread.sleep(ttlWAL); 216 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now)); 217 218 // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner, 219 // so we are not going down the chain 220 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL))); 221 222 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 223 LOG.info("File status: {}", Arrays.toString(status)); 224 225 // There should be 34 files and 1 masterProcedureWALs directory 226 assertEquals(35, fs.listStatus(OLD_WALS_DIR).length); 227 // 10 procedure WALs 228 assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); 229 230 LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, 231 ImmutableMap.of(HMaster.MASTER, masterServices)); 232 cleaner.chore(); 233 234 // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which 235 // are scheduled for replication and masterProcedureWALs directory 236 TEST_UTIL.waitFor(1000, 237 (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(OLD_WALS_DIR).length); 238 // In masterProcedureWALs we end up with 5 newer Procedure WALs 239 TEST_UTIL.waitFor(1000, 240 (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); 241 242 if (LOG.isDebugEnabled()) { 243 FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR); 244 FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR); 245 LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs)); 246 LOG.debug("Kept log file for masterProcedureWALs: {}", Arrays.toString(statusProcedureWALs)); 247 } 248 } 249 250 @Test 251 public void testOnConfigurationChange() throws Exception { 252 // Prepare environments 253 Server server = new DummyServer(); 254 255 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 256 LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, 257 ImmutableMap.of(HMaster.MASTER, masterServices)); 258 int size = cleaner.getSizeOfCleaners(); 259 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 260 cleaner.getCleanerThreadTimeoutMsec()); 261 // Create dir and files for test 262 int numOfFiles = 10; 263 createFiles(fs, OLD_WALS_DIR, numOfFiles); 264 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 265 assertEquals(numOfFiles, status.length); 266 // Start cleaner chore 267 Thread thread = new Thread(() -> cleaner.chore()); 268 thread.setDaemon(true); 269 thread.start(); 270 // change size of cleaners dynamically 271 int sizeToChange = 4; 272 long threadTimeoutToChange = 30 * 1000L; 273 conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange); 274 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); 275 cleaner.onConfigurationChange(conf); 276 assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners()); 277 assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); 278 // Stop chore 279 thread.join(); 280 status = fs.listStatus(OLD_WALS_DIR); 281 assertEquals(0, status.length); 282 } 283 284 private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { 285 for (int i = 0; i < numOfFiles; i++) { 286 // size of each file is 1M, 2M, or 3M 287 int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4); 288 byte[] M = new byte[Math.toIntExact(FileUtils.ONE_MB * xMega)]; 289 Bytes.random(M); 290 try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { 291 fsdos.write(M); 292 } 293 } 294 } 295 296 private static final class DummyServer extends MockServer { 297 298 @Override 299 public Configuration getConfiguration() { 300 return TEST_UTIL.getConfiguration(); 301 } 302 303 @Override 304 public ZKWatcher getZooKeeper() { 305 try { 306 return new ZKWatcher(getConfiguration(), "dummy server", this); 307 } catch (IOException e) { 308 e.printStackTrace(); 309 } 310 return null; 311 } 312 } 313}