001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.TimeUnit; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Stoppable; 034import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 040 041/** 042 * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old 043 * logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise. 044 * @see BaseLogCleanerDelegate 045 */ 046@InterfaceAudience.Private 047public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> { 048 private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName()); 049 050 public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; 051 public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2; 052 053 public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 054 "hbase.oldwals.cleaner.thread.timeout.msec"; 055 @VisibleForTesting 056 static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L; 057 058 public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 059 "hbase.oldwals.cleaner.thread.check.interval.msec"; 060 @VisibleForTesting 061 static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500L; 062 063 064 private final LinkedBlockingQueue<CleanerContext> pendingDelete; 065 private List<Thread> oldWALsCleaner; 066 private long cleanerThreadTimeoutMsec; 067 private long cleanerThreadCheckIntervalMsec; 068 069 /** 070 * @param period the period of time to sleep between each run 071 * @param stopper the stopper 072 * @param conf configuration to use 073 * @param fs handle to the FS 074 * @param oldLogDir the path to the archived logs 075 */ 076 public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 077 Path oldLogDir) { 078 super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS); 079 this.pendingDelete = new LinkedBlockingQueue<>(); 080 int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 081 this.oldWALsCleaner = createOldWalsCleaner(size); 082 this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 083 DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 084 this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, 085 DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); 086 } 087 088 @Override 089 protected boolean validate(Path file) { 090 return AbstractFSWALProvider.validateWALFilename(file.getName()) 091 || MasterProcedureUtil.validateProcedureWALFilename(file.getName()); 092 } 093 094 @Override 095 public void onConfigurationChange(Configuration conf) { 096 super.onConfigurationChange(conf); 097 098 int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 099 if (newSize == oldWALsCleaner.size()) { 100 if (LOG.isDebugEnabled()) { 101 LOG.debug("Size from configuration is the same as previous which is " + 102 newSize + ", no need to update."); 103 } 104 return; 105 } 106 interruptOldWALsCleaner(); 107 oldWALsCleaner = createOldWalsCleaner(newSize); 108 cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 109 DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 110 cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, 111 DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); 112 } 113 114 @Override 115 protected int deleteFiles(Iterable<FileStatus> filesToDelete) { 116 List<CleanerContext> results = new LinkedList<>(); 117 for (FileStatus toDelete : filesToDelete) { 118 CleanerContext context = CleanerContext.createCleanerContext(toDelete, 119 cleanerThreadTimeoutMsec); 120 if (context != null) { 121 pendingDelete.add(context); 122 results.add(context); 123 } 124 } 125 126 int deletedFiles = 0; 127 for (CleanerContext res : results) { 128 deletedFiles += res.getResult(cleanerThreadCheckIntervalMsec) ? 1 : 0; 129 } 130 return deletedFiles; 131 } 132 133 @Override 134 public synchronized void cleanup() { 135 super.cleanup(); 136 interruptOldWALsCleaner(); 137 } 138 139 @VisibleForTesting 140 int getSizeOfCleaners() { 141 return oldWALsCleaner.size(); 142 } 143 144 @VisibleForTesting 145 long getCleanerThreadTimeoutMsec() { 146 return cleanerThreadTimeoutMsec; 147 } 148 149 @VisibleForTesting 150 long getCleanerThreadCheckIntervalMsec() { 151 return cleanerThreadCheckIntervalMsec; 152 } 153 154 private List<Thread> createOldWalsCleaner(int size) { 155 LOG.info("Creating OldWALs cleaners with size=" + size); 156 157 List<Thread> oldWALsCleaner = new ArrayList<>(size); 158 for (int i = 0; i < size; i++) { 159 Thread cleaner = new Thread(() -> deleteFile()); 160 cleaner.setName("OldWALsCleaner-" + i); 161 cleaner.setDaemon(true); 162 cleaner.start(); 163 oldWALsCleaner.add(cleaner); 164 } 165 return oldWALsCleaner; 166 } 167 168 private void interruptOldWALsCleaner() { 169 for (Thread cleaner : oldWALsCleaner) { 170 cleaner.interrupt(); 171 } 172 oldWALsCleaner.clear(); 173 } 174 175 private void deleteFile() { 176 while (true) { 177 CleanerContext context = null; 178 boolean succeed = false; 179 boolean interrupted = false; 180 try { 181 context = pendingDelete.take(); 182 if (context != null) { 183 FileStatus toClean = context.getTargetToClean(); 184 succeed = this.fs.delete(toClean.getPath(), false); 185 } 186 } catch (InterruptedException ite) { 187 // It's most likely from configuration changing request 188 if (context != null) { 189 LOG.warn("Interrupted while cleaning oldWALs " + 190 context.getTargetToClean() + ", try to clean it next round."); 191 } 192 interrupted = true; 193 } catch (IOException e) { 194 // fs.delete() fails. 195 LOG.warn("Failed to clean oldwals with exception: " + e); 196 succeed = false; 197 } finally { 198 if (context != null) { 199 context.setResult(succeed); 200 } 201 if (interrupted) { 202 // Restore interrupt status 203 Thread.currentThread().interrupt(); 204 break; 205 } 206 } 207 } 208 if (LOG.isDebugEnabled()) { 209 LOG.debug("Exiting cleaner."); 210 } 211 } 212 213 @Override 214 public synchronized void cancel(boolean mayInterruptIfRunning) { 215 super.cancel(mayInterruptIfRunning); 216 for (Thread t : oldWALsCleaner) { 217 t.interrupt(); 218 } 219 } 220 221 private static final class CleanerContext { 222 223 final FileStatus target; 224 volatile boolean result; 225 volatile boolean setFromCleaner = false; 226 long timeoutMsec; 227 228 static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) { 229 return status != null ? new CleanerContext(status, timeoutMsec) : null; 230 } 231 232 private CleanerContext(FileStatus status, long timeoutMsec) { 233 this.target = status; 234 this.result = false; 235 this.timeoutMsec = timeoutMsec; 236 } 237 238 synchronized void setResult(boolean res) { 239 this.result = res; 240 this.setFromCleaner = true; 241 notify(); 242 } 243 244 synchronized boolean getResult(long waitIfNotFinished) { 245 long totalTimeMsec = 0; 246 try { 247 while (!setFromCleaner) { 248 long startTimeNanos = System.nanoTime(); 249 wait(waitIfNotFinished); 250 totalTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, 251 TimeUnit.NANOSECONDS); 252 if (totalTimeMsec >= timeoutMsec) { 253 LOG.warn("Spend too much time " + totalTimeMsec + " ms to delete oldwals " + target); 254 return result; 255 } 256 } 257 } catch (InterruptedException e) { 258 LOG.warn("Interrupted while waiting deletion of " + target); 259 return result; 260 } 261 return result; 262 } 263 264 FileStatus getTargetToClean() { 265 return target; 266 } 267 } 268}