001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.stream.Collectors; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Stoppable; 036import org.apache.hadoop.hbase.conf.ConfigurationObserver; 037import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 038import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 039import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 045import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 046 047/** 048 * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old 049 * logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise. 050 * @see BaseLogCleanerDelegate 051 */ 052@InterfaceAudience.Private 053public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> 054 implements ConfigurationObserver { 055 private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); 056 057 public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; 058 public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2; 059 060 public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 061 "hbase.oldwals.cleaner.thread.timeout.msec"; 062 @VisibleForTesting 063 static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L; 064 065 private final LinkedBlockingQueue<CleanerContext> pendingDelete; 066 private List<Thread> oldWALsCleaner; 067 private long cleanerThreadTimeoutMsec; 068 069 /** 070 * @param period the period of time to sleep between each run 071 * @param stopper the stopper 072 * @param conf configuration to use 073 * @param fs handle to the FS 074 * @param oldLogDir the path to the archived logs 075 * @param pool the thread pool used to scan directories 076 */ 077 public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 078 Path oldLogDir, DirScanPool pool) { 079 super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, 080 pool); 081 this.pendingDelete = new LinkedBlockingQueue<>(); 082 int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 083 this.oldWALsCleaner = createOldWalsCleaner(size); 084 this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 085 DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 086 } 087 088 @Override 089 protected boolean validate(Path file) { 090 return AbstractFSWALProvider.validateWALFilename(file.getName()) || 091 MasterProcedureUtil.validateProcedureWALFilename(file.getName()) || 092 file.getName().endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX); 093 } 094 095 @Override 096 public void onConfigurationChange(Configuration conf) { 097 int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); 098 if (newSize == oldWALsCleaner.size()) { 099 LOG.debug("Size from configuration is the same as previous which " 100 + "is {}, no need to update.", newSize); 101 return; 102 } 103 interruptOldWALsCleaner(); 104 oldWALsCleaner = createOldWalsCleaner(newSize); 105 cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, 106 DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); 107 } 108 109 @Override 110 protected int deleteFiles(Iterable<FileStatus> filesToDelete) { 111 List<CleanerContext> results = new ArrayList<>(); 112 for (FileStatus file : filesToDelete) { 113 LOG.trace("Scheduling file {} for deletion", file); 114 if (file != null) { 115 results.add(new CleanerContext(file)); 116 } 117 } 118 if (results.isEmpty()) { 119 return 0; 120 } 121 122 LOG.debug("Old WALs for delete: {}", 123 results.stream().map(cc -> cc.target.getPath().getName()). 124 collect(Collectors.joining(", "))); 125 pendingDelete.addAll(results); 126 127 int deletedFiles = 0; 128 for (CleanerContext res : results) { 129 LOG.trace("Awaiting the results for deletion of old WAL file: {}", res); 130 deletedFiles += res.getResult(this.cleanerThreadTimeoutMsec) ? 1 : 0; 131 } 132 return deletedFiles; 133 } 134 135 @Override 136 public synchronized void cleanup() { 137 super.cleanup(); 138 interruptOldWALsCleaner(); 139 } 140 141 @VisibleForTesting 142 int getSizeOfCleaners() { 143 return oldWALsCleaner.size(); 144 } 145 146 @VisibleForTesting 147 long getCleanerThreadTimeoutMsec() { 148 return cleanerThreadTimeoutMsec; 149 } 150 151 private List<Thread> createOldWalsCleaner(int size) { 152 LOG.info("Creating {} old WALs cleaner threads", size); 153 154 List<Thread> oldWALsCleaner = new ArrayList<>(size); 155 for (int i = 0; i < size; i++) { 156 Thread cleaner = new Thread(() -> deleteFile()); 157 cleaner.setName("OldWALsCleaner-" + i); 158 cleaner.setDaemon(true); 159 cleaner.start(); 160 oldWALsCleaner.add(cleaner); 161 } 162 return oldWALsCleaner; 163 } 164 165 private void interruptOldWALsCleaner() { 166 for (Thread cleaner : oldWALsCleaner) { 167 LOG.trace("Interrupting thread: {}", cleaner); 168 cleaner.interrupt(); 169 } 170 oldWALsCleaner.clear(); 171 } 172 173 private void deleteFile() { 174 while (true) { 175 try { 176 final CleanerContext context = pendingDelete.take(); 177 Preconditions.checkNotNull(context); 178 FileStatus oldWalFile = context.getTargetToClean(); 179 try { 180 LOG.debug("Deleting {}", oldWalFile); 181 boolean succeed = this.fs.delete(oldWalFile.getPath(), false); 182 context.setResult(succeed); 183 } catch (IOException e) { 184 // fs.delete() fails. 185 LOG.warn("Failed to delete old WAL file", e); 186 context.setResult(false); 187 } 188 } catch (InterruptedException ite) { 189 // It is most likely from configuration changing request 190 LOG.warn("Interrupted while cleaning old WALs, will " 191 + "try to clean it next round. Exiting."); 192 // Restore interrupt status 193 Thread.currentThread().interrupt(); 194 return; 195 } 196 LOG.trace("Exiting"); 197 } 198 } 199 200 @Override 201 public synchronized void cancel(boolean mayInterruptIfRunning) { 202 LOG.debug("Cancelling LogCleaner"); 203 super.cancel(mayInterruptIfRunning); 204 interruptOldWALsCleaner(); 205 } 206 207 private static final class CleanerContext { 208 209 final FileStatus target; 210 final AtomicBoolean result; 211 final CountDownLatch remainingResults; 212 213 private CleanerContext(FileStatus status) { 214 this.target = status; 215 this.result = new AtomicBoolean(false); 216 this.remainingResults = new CountDownLatch(1); 217 } 218 219 void setResult(boolean res) { 220 this.result.set(res); 221 this.remainingResults.countDown(); 222 } 223 224 boolean getResult(long waitIfNotFinished) { 225 try { 226 boolean completed = this.remainingResults.await(waitIfNotFinished, 227 TimeUnit.MILLISECONDS); 228 if (!completed) { 229 LOG.warn("Spent too much time [{}ms] deleting old WAL file: {}", 230 waitIfNotFinished, target); 231 return false; 232 } 233 } catch (InterruptedException e) { 234 LOG.warn("Interrupted while awaiting deletion of WAL file: {}", target); 235 return false; 236 } 237 return result.get(); 238 } 239 240 FileStatus getTargetToClean() { 241 return target; 242 } 243 244 @Override 245 public String toString() { 246 return "CleanerContext [target=" + target + ", result=" + result + "]"; 247 } 248 } 249}