001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.TimeUnit; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Stoppable; 033import org.apache.hadoop.hbase.conf.ConfigurationObserver; 034import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 041 042/** 043 * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old 044 * logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise. 045 * @see BaseLogCleanerDelegate 046 */ 047@InterfaceAudience.Private 048public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> 049 implements ConfigurationObserver { 050 051 private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName()); 052 053 public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; 054 public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2; 055 056 public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 057 "hbase.oldwals.cleaner.thread.timeout.msec"; 058 @VisibleForTesting 059 static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L; 060 061 public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 062 "hbase.oldwals.cleaner.thread.check.interval.msec"; 063 @VisibleForTesting 064 static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500L; 065 066 067 private final LinkedBlockingQueue<CleanerContext> pendingDelete; 068 private List<Thread> oldWALsCleaner; 069 private long cleanerThreadTimeoutMsec; 070 private long cleanerThreadCheckIntervalMsec; 071 072 /** 073 * @param period the period of time to sleep between each run 074 * @param stopper the stopper 075 * @param conf configuration to use 076 * @param fs handle to the FS 077 * @param oldLogDir the path to the archived logs 078 * @param pool the thread pool used to scan directories 079 */ 080 public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 081 Path oldLogDir, DirScanPool pool) { 082 super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, 083 pool); 084 this.pendingDelete = new LinkedBlockingQueue<>(); 085 int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 086 this.oldWALsCleaner = createOldWalsCleaner(size); 087 this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 088 DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 089 this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, 090 DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); 091 } 092 093 @Override 094 protected boolean validate(Path file) { 095 return AbstractFSWALProvider.validateWALFilename(file.getName()) 096 || MasterProcedureUtil.validateProcedureWALFilename(file.getName()); 097 } 098 099 @Override 100 public void onConfigurationChange(Configuration conf) { 101 int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 102 if (newSize == oldWALsCleaner.size()) { 103 if (LOG.isDebugEnabled()) { 104 LOG.debug("Size from configuration is the same as previous which is " + 105 newSize + ", no need to update."); 106 } 107 return; 108 } 109 interruptOldWALsCleaner(); 110 oldWALsCleaner = createOldWalsCleaner(newSize); 111 cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 112 DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 113 cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, 114 DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); 115 } 116 117 @Override 118 protected int deleteFiles(Iterable<FileStatus> filesToDelete) { 119 List<CleanerContext> results = new LinkedList<>(); 120 for (FileStatus toDelete : filesToDelete) { 121 CleanerContext context = CleanerContext.createCleanerContext(toDelete, 122 cleanerThreadTimeoutMsec); 123 if (context != null) { 124 pendingDelete.add(context); 125 results.add(context); 126 } 127 } 128 129 int deletedFiles = 0; 130 for (CleanerContext res : results) { 131 deletedFiles += res.getResult(cleanerThreadCheckIntervalMsec) ? 1 : 0; 132 } 133 return deletedFiles; 134 } 135 136 @Override 137 public synchronized void cleanup() { 138 super.cleanup(); 139 interruptOldWALsCleaner(); 140 } 141 142 @VisibleForTesting 143 int getSizeOfCleaners() { 144 return oldWALsCleaner.size(); 145 } 146 147 @VisibleForTesting 148 long getCleanerThreadTimeoutMsec() { 149 return cleanerThreadTimeoutMsec; 150 } 151 152 @VisibleForTesting 153 long getCleanerThreadCheckIntervalMsec() { 154 return cleanerThreadCheckIntervalMsec; 155 } 156 157 private List<Thread> createOldWalsCleaner(int size) { 158 LOG.info("Creating OldWALs cleaners with size=" + size); 159 160 List<Thread> oldWALsCleaner = new ArrayList<>(size); 161 for (int i = 0; i < size; i++) { 162 Thread cleaner = new Thread(() -> deleteFile()); 163 cleaner.setName("OldWALsCleaner-" + i); 164 cleaner.setDaemon(true); 165 cleaner.start(); 166 oldWALsCleaner.add(cleaner); 167 } 168 return oldWALsCleaner; 169 } 170 171 private void interruptOldWALsCleaner() { 172 for (Thread cleaner : oldWALsCleaner) { 173 cleaner.interrupt(); 174 } 175 oldWALsCleaner.clear(); 176 } 177 178 private void deleteFile() { 179 while (true) { 180 CleanerContext context = null; 181 boolean succeed = false; 182 boolean interrupted = false; 183 try { 184 context = pendingDelete.take(); 185 if (context != null) { 186 FileStatus toClean = context.getTargetToClean(); 187 succeed = this.fs.delete(toClean.getPath(), false); 188 } 189 } catch (InterruptedException ite) { 190 // It's most likely from configuration changing request 191 if (context != null) { 192 LOG.warn("Interrupted while cleaning oldWALs " + 193 context.getTargetToClean() + ", try to clean it next round."); 194 } 195 interrupted = true; 196 } catch (IOException e) { 197 // fs.delete() fails. 198 LOG.warn("Failed to clean oldwals with exception: " + e); 199 succeed = false; 200 } finally { 201 if (context != null) { 202 context.setResult(succeed); 203 } 204 if (interrupted) { 205 // Restore interrupt status 206 Thread.currentThread().interrupt(); 207 break; 208 } 209 } 210 } 211 if (LOG.isDebugEnabled()) { 212 LOG.debug("Exiting cleaner."); 213 } 214 } 215 216 @Override 217 public synchronized void cancel(boolean mayInterruptIfRunning) { 218 super.cancel(mayInterruptIfRunning); 219 for (Thread t : oldWALsCleaner) { 220 t.interrupt(); 221 } 222 } 223 224 private static final class CleanerContext { 225 226 final FileStatus target; 227 volatile boolean result; 228 volatile boolean setFromCleaner = false; 229 long timeoutMsec; 230 231 static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) { 232 return status != null ? new CleanerContext(status, timeoutMsec) : null; 233 } 234 235 private CleanerContext(FileStatus status, long timeoutMsec) { 236 this.target = status; 237 this.result = false; 238 this.timeoutMsec = timeoutMsec; 239 } 240 241 synchronized void setResult(boolean res) { 242 this.result = res; 243 this.setFromCleaner = true; 244 notify(); 245 } 246 247 synchronized boolean getResult(long waitIfNotFinished) { 248 long totalTimeMsec = 0; 249 try { 250 while (!setFromCleaner) { 251 long startTimeNanos = System.nanoTime(); 252 wait(waitIfNotFinished); 253 totalTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, 254 TimeUnit.NANOSECONDS); 255 if (totalTimeMsec >= timeoutMsec) { 256 LOG.warn("Spend too much time " + totalTimeMsec + " ms to delete oldwals " + target); 257 return result; 258 } 259 } 260 } catch (InterruptedException e) { 261 LOG.warn("Interrupted while waiting deletion of " + target); 262 return result; 263 } 264 return result; 265 } 266 267 FileStatus getTargetToClean() { 268 return target; 269 } 270 } 271}