001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Stoppable; 036import org.apache.hadoop.hbase.conf.ConfigurationObserver; 037import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 038import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 039import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 045 046/** 047 * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old 048 * logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise. 049 * @see BaseLogCleanerDelegate 050 */ 051@InterfaceAudience.Private 052public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> 053 implements ConfigurationObserver { 054 private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); 055 056 public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; 057 public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2; 058 059 public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 060 "hbase.oldwals.cleaner.thread.timeout.msec"; 061 static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L; 062 063 private final LinkedBlockingQueue<CleanerContext> pendingDelete; 064 private List<Thread> oldWALsCleaner; 065 private long cleanerThreadTimeoutMsec; 066 067 /** 068 * @param period the period of time to sleep between each run 069 * @param stopper the stopper 070 * @param conf configuration to use 071 * @param fs handle to the FS 072 * @param oldLogDir the path to the archived logs 073 * @param pool the thread pool used to scan directories 074 */ 075 public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 076 Path oldLogDir, DirScanPool pool, Map<String, Object> params) { 077 super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, 078 pool, params, null); 079 this.pendingDelete = new LinkedBlockingQueue<>(); 080 int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 081 if (size <= 0) { 082 size = DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE; 083 LOG.warn( 084 "The configuration {} has been set to an invalid value {}, " 085 + "the default value {} will be used.", 086 OLD_WALS_CLEANER_THREAD_SIZE, size, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 087 } 088 this.oldWALsCleaner = createOldWalsCleaner(size); 089 this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 090 DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 091 } 092 093 @Override 094 protected boolean validate(Path file) { 095 return AbstractFSWALProvider.validateWALFilename(file.getName()) 096 || MasterProcedureUtil.validateProcedureWALFilename(file.getName()) 097 || file.getName().endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX); 098 } 099 100 @Override 101 public void onConfigurationChange(Configuration conf) { 102 int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 103 if (newSize <= 0) { 104 LOG.debug( 105 "The configuration {} has been set to an invalid value {}, " 106 + "the previous value {} will be used, no need to update.", 107 OLD_WALS_CLEANER_THREAD_SIZE, newSize, oldWALsCleaner.size()); 108 return; 109 } 110 if (newSize == oldWALsCleaner.size()) { 111 LOG.debug( 112 "Size from configuration is the same as previous which " + "is {}, no need to update.", 113 newSize); 114 return; 115 } 116 interruptOldWALsCleaner(); 117 oldWALsCleaner = createOldWalsCleaner(newSize); 118 cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 119 DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 120 } 121 122 @Override 123 protected int deleteFiles(Iterable<FileStatus> filesToDelete) { 124 List<CleanerContext> results = new ArrayList<>(); 125 for (FileStatus file : filesToDelete) { 126 LOG.trace("Scheduling file {} for deletion", file); 127 if (file != null) { 128 results.add(new CleanerContext(file)); 129 } 130 } 131 if (results.isEmpty()) { 132 return 0; 133 } 134 135 LOG.debug("Old WALs for delete: {}", 136 results.stream().map(cc -> cc.target.getPath().getName()).collect(Collectors.joining(", "))); 137 pendingDelete.addAll(results); 138 139 int deletedFiles = 0; 140 for (CleanerContext res : results) { 141 LOG.trace("Awaiting the results for deletion of old WAL file: {}", res); 142 deletedFiles += res.getResult(this.cleanerThreadTimeoutMsec) ? 1 : 0; 143 } 144 return deletedFiles; 145 } 146 147 @Override 148 public synchronized void cleanup() { 149 super.cleanup(); 150 interruptOldWALsCleaner(); 151 } 152 153 int getSizeOfCleaners() { 154 return oldWALsCleaner.size(); 155 } 156 157 long getCleanerThreadTimeoutMsec() { 158 return cleanerThreadTimeoutMsec; 159 } 160 161 private List<Thread> createOldWalsCleaner(int size) { 162 LOG.info("Creating {} old WALs cleaner threads", size); 163 164 List<Thread> oldWALsCleaner = new ArrayList<>(size); 165 for (int i = 0; i < size; i++) { 166 Thread cleaner = new Thread(() -> deleteFile()); 167 cleaner.setName("OldWALsCleaner-" + i); 168 cleaner.setDaemon(true); 169 cleaner.start(); 170 oldWALsCleaner.add(cleaner); 171 } 172 return oldWALsCleaner; 173 } 174 175 private void interruptOldWALsCleaner() { 176 for (Thread cleaner : oldWALsCleaner) { 177 LOG.trace("Interrupting thread: {}", cleaner); 178 cleaner.interrupt(); 179 } 180 oldWALsCleaner.clear(); 181 } 182 183 private void deleteFile() { 184 while (true) { 185 try { 186 final CleanerContext context = pendingDelete.take(); 187 Preconditions.checkNotNull(context); 188 FileStatus oldWalFile = context.getTargetToClean(); 189 try { 190 LOG.debug("Deleting {}", oldWalFile); 191 boolean succeed = this.fs.delete(oldWalFile.getPath(), false); 192 context.setResult(succeed); 193 } catch (IOException e) { 194 // fs.delete() fails. 195 LOG.warn("Failed to delete old WAL file", e); 196 context.setResult(false); 197 } 198 } catch (InterruptedException ite) { 199 // It is most likely from configuration changing request 200 LOG.warn( 201 "Interrupted while cleaning old WALs, will " + "try to clean it next round. Exiting."); 202 // Restore interrupt status 203 Thread.currentThread().interrupt(); 204 return; 205 } 206 LOG.trace("Exiting"); 207 } 208 } 209 210 @Override 211 public synchronized void cancel(boolean mayInterruptIfRunning) { 212 LOG.debug("Cancelling LogCleaner"); 213 super.cancel(mayInterruptIfRunning); 214 interruptOldWALsCleaner(); 215 } 216 217 private static final class CleanerContext { 218 219 final FileStatus target; 220 final AtomicBoolean result; 221 final CountDownLatch remainingResults; 222 223 private CleanerContext(FileStatus status) { 224 this.target = status; 225 this.result = new AtomicBoolean(false); 226 this.remainingResults = new CountDownLatch(1); 227 } 228 229 void setResult(boolean res) { 230 this.result.set(res); 231 this.remainingResults.countDown(); 232 } 233 234 boolean getResult(long waitIfNotFinished) { 235 try { 236 boolean completed = this.remainingResults.await(waitIfNotFinished, TimeUnit.MILLISECONDS); 237 if (!completed) { 238 LOG.warn("Spent too much time [{}ms] deleting old WAL file: {}", waitIfNotFinished, 239 target); 240 return false; 241 } 242 } catch (InterruptedException e) { 243 LOG.warn("Interrupted while awaiting deletion of WAL file: {}", target); 244 return false; 245 } 246 return result.get(); 247 } 248 249 FileStatus getTargetToClean() { 250 return target; 251 } 252 253 @Override 254 public String toString() { 255 return "CleanerContext [target=" + target + ", result=" + result + "]"; 256 } 257 } 258}