001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.junit.Assert.assertEquals; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.when; 023 024import java.io.IOException; 025import java.net.URLEncoder; 026import java.nio.charset.StandardCharsets; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.concurrent.ThreadLocalRandom; 031import org.apache.commons.io.FileUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataOutputStream; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.Server; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.TableNameTestRule; 043import org.apache.hadoop.hbase.Waiter; 044import org.apache.hadoop.hbase.client.AsyncClusterConnection; 045import org.apache.hadoop.hbase.client.TableDescriptor; 046import org.apache.hadoop.hbase.master.HMaster; 047import org.apache.hadoop.hbase.master.MasterServices; 048import org.apache.hadoop.hbase.master.ServerManager; 049import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 050import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; 051import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 052import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 053import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 054import org.apache.hadoop.hbase.replication.ReplicationQueueId; 055import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 056import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 057import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; 058import org.apache.hadoop.hbase.testclassification.MasterTests; 059import org.apache.hadoop.hbase.testclassification.MediumTests; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.apache.hadoop.hbase.util.MockServer; 063import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 064import org.junit.AfterClass; 065import org.junit.Before; 066import org.junit.BeforeClass; 067import org.junit.ClassRule; 068import org.junit.Rule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 075 076@Category({ MasterTests.class, MediumTests.class }) 077public class TestLogsCleaner { 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestLogsCleaner.class); 082 083 private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class); 084 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 085 086 private static final Path OLD_WALS_DIR = 087 new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); 088 089 private static final Path OLD_PROCEDURE_WALS_DIR = new Path(OLD_WALS_DIR, "masterProcedureWALs"); 090 091 private static Configuration conf; 092 093 private static DirScanPool POOL; 094 095 private static String peerId = "1"; 096 097 private MasterServices masterServices; 098 099 private ReplicationQueueStorage queueStorage; 100 101 @Rule 102 public final TableNameTestRule tableNameRule = new TableNameTestRule(); 103 104 @BeforeClass 105 public static void setUpBeforeClass() throws Exception { 106 TEST_UTIL.startMiniCluster(); 107 POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration()); 108 } 109 110 @AfterClass 111 public static void tearDownAfterClass() throws Exception { 112 TEST_UTIL.shutdownMiniCluster(); 113 POOL.shutdownNow(); 114 } 115 116 @Before 117 public void beforeTest() throws Exception { 118 conf = TEST_UTIL.getConfiguration(); 119 120 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 121 122 fs.delete(OLD_WALS_DIR, true); 123 124 // root directory 125 fs.mkdirs(OLD_WALS_DIR); 126 127 TableName tableName = tableNameRule.getTableName(); 128 TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); 129 TEST_UTIL.getAdmin().createTable(td); 130 TEST_UTIL.waitTableAvailable(tableName); 131 queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), 132 conf, tableName); 133 134 masterServices = mock(MasterServices.class); 135 when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection()); 136 when(masterServices.getReplicationLogCleanerBarrier()) 137 .thenReturn(new ReplicationLogCleanerBarrier()); 138 AsyncClusterConnection asyncClusterConnection = mock(AsyncClusterConnection.class); 139 when(masterServices.getAsyncClusterConnection()).thenReturn(asyncClusterConnection); 140 when(asyncClusterConnection.isClosed()).thenReturn(false); 141 ReplicationPeerManager rpm = mock(ReplicationPeerManager.class); 142 when(masterServices.getReplicationPeerManager()).thenReturn(rpm); 143 when(rpm.getQueueStorage()).thenReturn(queueStorage); 144 when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); 145 ServerManager sm = mock(ServerManager.class); 146 when(masterServices.getServerManager()).thenReturn(sm); 147 when(sm.getOnlineServersList()).thenReturn(Collections.emptyList()); 148 @SuppressWarnings("unchecked") 149 ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class); 150 when(masterServices.getMasterProcedureExecutor()).thenReturn(procExec); 151 when(procExec.getProcedures()).thenReturn(Collections.emptyList()); 152 } 153 154 /** 155 * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same 156 * oldWALs directory. 157 * <p/> 158 * Created files: 159 * <ul> 160 * <li>2 invalid files</li> 161 * <li>5 old Procedure WALs</li> 162 * <li>30 old WALs from which 3 are in replication</li> 163 * <li>5 recent Procedure WALs</li> 164 * <li>1 recent WAL</li> 165 * <li>1 very new WAL (timestamp in future)</li> 166 * <li>masterProcedureWALs subdirectory</li> 167 * </ul> 168 * Files which should stay: 169 * <ul> 170 * <li>3 replication WALs</li> 171 * <li>2 new WALs</li> 172 * <li>5 latest Procedure WALs</li> 173 * <li>masterProcedureWALs subdirectory</li> 174 * </ul> 175 */ 176 @Test 177 public void testLogCleaning() throws Exception { 178 // set TTLs 179 long ttlWAL = 2000; 180 long ttlProcedureWAL = 4000; 181 conf.setLong("hbase.master.logcleaner.ttl", ttlWAL); 182 conf.setLong("hbase.master.procedurewalcleaner.ttl", ttlProcedureWAL); 183 184 HMaster.decorateMasterConfiguration(conf); 185 Server server = new DummyServer(); 186 String fakeMachineName = 187 URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name()); 188 189 final FileSystem fs = FileSystem.get(conf); 190 fs.mkdirs(OLD_PROCEDURE_WALS_DIR); 191 192 final long now = EnvironmentEdgeManager.currentTime(); 193 194 // Case 1: 2 invalid files, which would be deleted directly 195 fs.createNewFile(new Path(OLD_WALS_DIR, "a")); 196 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a")); 197 198 // Case 2: 5 Procedure WALs that are old which would be deleted 199 for (int i = 1; i <= 5; i++) { 200 final Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 201 fs.createNewFile(fileName); 202 } 203 204 // Sleep for sometime to get old procedure WALs 205 Thread.sleep(ttlProcedureWAL - ttlWAL); 206 207 // Case 3: old WALs which would be deletable 208 for (int i = 1; i <= 30; i++) { 209 Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i)); 210 fs.createNewFile(fileName); 211 } 212 // Case 4: the newest 3 WALs will be kept because they are beyond the replication offset 213 masterServices.getReplicationPeerManager().listPeers(null) 214 .add(new ReplicationPeerDescription(peerId, true, null, null)); 215 queueStorage.setOffset(new ReplicationQueueId(server.getServerName(), peerId), fakeMachineName, 216 new ReplicationGroupOffset(fakeMachineName + "." + (now - 3), 0), Collections.emptyMap()); 217 // Case 5: 5 Procedure WALs that are new, will stay 218 for (int i = 6; i <= 10; i++) { 219 Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); 220 fs.createNewFile(fileName); 221 } 222 223 // Sleep for sometime to get newer modification time 224 Thread.sleep(ttlWAL); 225 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now)); 226 227 // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner, 228 // so we are not going down the chain 229 fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL))); 230 231 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 232 LOG.info("File status: {}", Arrays.toString(status)); 233 234 // There should be 34 files and 1 masterProcedureWALs directory 235 assertEquals(35, fs.listStatus(OLD_WALS_DIR).length); 236 // 10 procedure WALs 237 assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); 238 239 LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, 240 ImmutableMap.of(HMaster.MASTER, masterServices)); 241 cleaner.chore(); 242 243 // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which 244 // are scheduled for replication and masterProcedureWALs directory 245 TEST_UTIL.waitFor(1000, 246 (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(OLD_WALS_DIR).length); 247 // In masterProcedureWALs we end up with 5 newer Procedure WALs 248 TEST_UTIL.waitFor(1000, 249 (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); 250 251 if (LOG.isDebugEnabled()) { 252 FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR); 253 FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR); 254 LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs)); 255 LOG.debug("Kept log file for masterProcedureWALs: {}", Arrays.toString(statusProcedureWALs)); 256 } 257 } 258 259 @Test 260 public void testOnConfigurationChange() throws Exception { 261 // Prepare environments 262 Server server = new DummyServer(); 263 264 FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); 265 LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, 266 ImmutableMap.of(HMaster.MASTER, masterServices)); 267 int size = cleaner.getSizeOfCleaners(); 268 assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 269 cleaner.getCleanerThreadTimeoutMsec()); 270 // Create dir and files for test 271 int numOfFiles = 10; 272 createFiles(fs, OLD_WALS_DIR, numOfFiles); 273 FileStatus[] status = fs.listStatus(OLD_WALS_DIR); 274 assertEquals(numOfFiles, status.length); 275 // Start cleaner chore 276 Thread thread = new Thread(() -> cleaner.chore()); 277 thread.setDaemon(true); 278 thread.start(); 279 // change size of cleaners dynamically 280 int sizeToChange = 4; 281 long threadTimeoutToChange = 30 * 1000L; 282 conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, size + sizeToChange); 283 conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); 284 cleaner.onConfigurationChange(conf); 285 assertEquals(sizeToChange + size, cleaner.getSizeOfCleaners()); 286 assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); 287 // Stop chore 288 thread.join(); 289 status = fs.listStatus(OLD_WALS_DIR); 290 assertEquals(0, status.length); 291 } 292 293 private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { 294 for (int i = 0; i < numOfFiles; i++) { 295 // size of each file is 1M, 2M, or 3M 296 int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4); 297 byte[] M = new byte[Math.toIntExact(FileUtils.ONE_MB * xMega)]; 298 Bytes.random(M); 299 try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { 300 fsdos.write(M); 301 } 302 } 303 } 304 305 private static final class DummyServer extends MockServer { 306 307 @Override 308 public Configuration getConfiguration() { 309 return TEST_UTIL.getConfiguration(); 310 } 311 312 @Override 313 public ZKWatcher getZooKeeper() { 314 try { 315 return new ZKWatcher(getConfiguration(), "dummy server", this); 316 } catch (IOException e) { 317 e.printStackTrace(); 318 } 319 return null; 320 } 321 } 322}