001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicBoolean; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Stoppable; 034import org.apache.hadoop.hbase.conf.ConfigurationObserver; 035import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 036import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 043 044/** 045 * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old 046 * logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise. 047 * @see BaseLogCleanerDelegate 048 */ 049@InterfaceAudience.Private 050public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> 051 implements ConfigurationObserver { 052 private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); 053 054 public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; 055 public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2; 056 057 public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 058 "hbase.oldwals.cleaner.thread.timeout.msec"; 059 @VisibleForTesting 060 static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L; 061 062 private final LinkedBlockingQueue<CleanerContext> pendingDelete; 063 private List<Thread> oldWALsCleaner; 064 private long cleanerThreadTimeoutMsec; 065 066 /** 067 * @param period the period of time to sleep between each run 068 * @param stopper the stopper 069 * @param conf configuration to use 070 * @param fs handle to the FS 071 * @param oldLogDir the path to the archived logs 072 * @param pool the thread pool used to scan directories 073 */ 074 public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 075 Path oldLogDir, DirScanPool pool) { 076 super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, 077 pool); 078 this.pendingDelete = new LinkedBlockingQueue<>(); 079 int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 080 this.oldWALsCleaner = createOldWalsCleaner(size); 081 this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 082 DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 083 } 084 085 @Override 086 protected boolean validate(Path file) { 087 return AbstractFSWALProvider.validateWALFilename(file.getName()) 088 || MasterProcedureUtil.validateProcedureWALFilename(file.getName()); 089 } 090 091 @Override 092 public void onConfigurationChange(Configuration conf) { 093 int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 094 if (newSize == oldWALsCleaner.size()) { 095 LOG.debug("Size from configuration is the same as previous which " 096 + "is {}, no need to update.", newSize); 097 return; 098 } 099 interruptOldWALsCleaner(); 100 oldWALsCleaner = createOldWalsCleaner(newSize); 101 cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 102 DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 103 } 104 105 @Override 106 protected int deleteFiles(Iterable<FileStatus> filesToDelete) { 107 List<CleanerContext> results = new ArrayList<>(); 108 for (FileStatus file : filesToDelete) { 109 LOG.trace("Scheduling file {} for deletion", file); 110 if (file != null) { 111 results.add(new CleanerContext(file)); 112 } 113 } 114 115 LOG.debug("Old WAL files pending deletion: {}", results); 116 pendingDelete.addAll(results); 117 118 int deletedFiles = 0; 119 for (CleanerContext res : results) { 120 LOG.trace("Awaiting the results for deletion of old WAL file: {}", res); 121 deletedFiles += res.getResult(this.cleanerThreadTimeoutMsec) ? 1 : 0; 122 } 123 return deletedFiles; 124 } 125 126 @Override 127 public synchronized void cleanup() { 128 super.cleanup(); 129 interruptOldWALsCleaner(); 130 } 131 132 @VisibleForTesting 133 int getSizeOfCleaners() { 134 return oldWALsCleaner.size(); 135 } 136 137 @VisibleForTesting 138 long getCleanerThreadTimeoutMsec() { 139 return cleanerThreadTimeoutMsec; 140 } 141 142 private List<Thread> createOldWalsCleaner(int size) { 143 LOG.info("Creating {} OldWALs cleaner threads", size); 144 145 List<Thread> oldWALsCleaner = new ArrayList<>(size); 146 for (int i = 0; i < size; i++) { 147 Thread cleaner = new Thread(() -> deleteFile()); 148 cleaner.setName("OldWALsCleaner-" + i); 149 cleaner.setDaemon(true); 150 cleaner.start(); 151 oldWALsCleaner.add(cleaner); 152 } 153 return oldWALsCleaner; 154 } 155 156 private void interruptOldWALsCleaner() { 157 for (Thread cleaner : oldWALsCleaner) { 158 LOG.trace("Interrupting thread: {}", cleaner); 159 cleaner.interrupt(); 160 } 161 oldWALsCleaner.clear(); 162 } 163 164 private void deleteFile() { 165 while (true) { 166 try { 167 final CleanerContext context = pendingDelete.take(); 168 Preconditions.checkNotNull(context); 169 FileStatus oldWalFile = context.getTargetToClean(); 170 try { 171 LOG.debug("Attempting to delete old WAL file: {}", oldWalFile); 172 boolean succeed = this.fs.delete(oldWalFile.getPath(), false); 173 context.setResult(succeed); 174 } catch (IOException e) { 175 // fs.delete() fails. 176 LOG.warn("Failed to clean old WAL file", e); 177 context.setResult(false); 178 } 179 } catch (InterruptedException ite) { 180 // It is most likely from configuration changing request 181 LOG.warn("Interrupted while cleaning old WALs, will " 182 + "try to clean it next round. Exiting."); 183 // Restore interrupt status 184 Thread.currentThread().interrupt(); 185 return; 186 } 187 LOG.debug("Exiting"); 188 } 189 } 190 191 @Override 192 public synchronized void cancel(boolean mayInterruptIfRunning) { 193 LOG.debug("Cancelling LogCleaner"); 194 super.cancel(mayInterruptIfRunning); 195 interruptOldWALsCleaner(); 196 } 197 198 private static final class CleanerContext { 199 200 final FileStatus target; 201 final AtomicBoolean result; 202 final CountDownLatch remainingResults; 203 204 private CleanerContext(FileStatus status) { 205 this.target = status; 206 this.result = new AtomicBoolean(false); 207 this.remainingResults = new CountDownLatch(1); 208 } 209 210 void setResult(boolean res) { 211 this.result.set(res); 212 this.remainingResults.countDown(); 213 } 214 215 boolean getResult(long waitIfNotFinished) { 216 try { 217 boolean completed = this.remainingResults.await(waitIfNotFinished, 218 TimeUnit.MILLISECONDS); 219 if (!completed) { 220 LOG.warn("Spend too much time [{}ms] to delete old WAL file: {}", 221 waitIfNotFinished, target); 222 return false; 223 } 224 } catch (InterruptedException e) { 225 LOG.warn("Interrupted while awaiting deletion of WAL file: {}", target); 226 return false; 227 } 228 return result.get(); 229 } 230 231 FileStatus getTargetToClean() { 232 return target; 233 } 234 235 @Override 236 public String toString() { 237 return "CleanerContext [target=" + target + ", result=" + result + "]"; 238 } 239 } 240}