001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Stoppable; 033import org.apache.hadoop.hbase.conf.ConfigurationObserver; 034import org.apache.hadoop.hbase.io.HFileLink; 035import org.apache.hadoop.hbase.master.region.MasterRegionFactory; 036import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 037import org.apache.hadoop.hbase.util.StealJobQueue; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * This Chore, every time it runs, will clear the HFiles in the hfile archive 044 * folder that are deletable for each HFile cleaner in the chain. 045 */ 046@InterfaceAudience.Private 047public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> 048 implements ConfigurationObserver { 049 050 public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins"; 051 052 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 053 Path directory, DirScanPool pool) { 054 this(period, stopper, conf, fs, directory, pool, null); 055 } 056 057 // Configuration key for large/small throttle point 058 public final static String HFILE_DELETE_THROTTLE_THRESHOLD = 059 "hbase.regionserver.thread.hfilecleaner.throttle"; 060 public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M 061 062 // Configuration key for large queue initial size 063 public final static String LARGE_HFILE_QUEUE_INIT_SIZE = 064 "hbase.regionserver.hfilecleaner.large.queue.size"; 065 public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240; 066 067 // Configuration key for small queue initial size 068 public final static String SMALL_HFILE_QUEUE_INIT_SIZE = 069 "hbase.regionserver.hfilecleaner.small.queue.size"; 070 public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240; 071 072 // Configuration key for large file delete thread number 073 public final static String LARGE_HFILE_DELETE_THREAD_NUMBER = 074 "hbase.regionserver.hfilecleaner.large.thread.count"; 075 public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1; 076 077 // Configuration key for small file delete thread number 078 public final static String SMALL_HFILE_DELETE_THREAD_NUMBER = 079 "hbase.regionserver.hfilecleaner.small.thread.count"; 080 public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1; 081 082 public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC = 083 "hbase.regionserver.hfilecleaner.thread.timeout.msec"; 084 static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L; 085 086 public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 087 "hbase.regionserver.hfilecleaner.thread.check.interval.msec"; 088 static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L; 089 090 private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class); 091 092 StealJobQueue<HFileDeleteTask> largeFileQueue; 093 BlockingQueue<HFileDeleteTask> smallFileQueue; 094 private int throttlePoint; 095 private int largeQueueInitSize; 096 private int smallQueueInitSize; 097 private int largeFileDeleteThreadNumber; 098 private int smallFileDeleteThreadNumber; 099 private long cleanerThreadTimeoutMsec; 100 private long cleanerThreadCheckIntervalMsec; 101 private List<Thread> threads = new ArrayList<Thread>(); 102 private volatile boolean running; 103 104 private AtomicLong deletedLargeFiles = new AtomicLong(); 105 private AtomicLong deletedSmallFiles = new AtomicLong(); 106 107 /** 108 * @param period the period of time to sleep between each run 109 * @param stopper the stopper 110 * @param conf configuration to use 111 * @param fs handle to the FS 112 * @param directory directory to be cleaned 113 * @param pool the thread pool used to scan directories 114 * @param params params could be used in subclass of BaseHFileCleanerDelegate 115 */ 116 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 117 Path directory, DirScanPool pool, Map<String, Object> params) { 118 this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool, 119 params); 120 121 } 122 123 /** 124 * For creating customized HFileCleaner. 125 * @param name name of the chore being run 126 * @param period the period of time to sleep between each run 127 * @param stopper the stopper 128 * @param conf configuration to use 129 * @param fs handle to the FS 130 * @param directory directory to be cleaned 131 * @param confKey configuration key for the classes to instantiate 132 * @param pool the thread pool used to scan directories 133 * @param params params could be used in subclass of BaseHFileCleanerDelegate 134 */ 135 public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs, 136 Path directory, String confKey, DirScanPool pool, Map<String, Object> params) { 137 super(name, period, stopper, conf, fs, directory, confKey, pool, params); 138 throttlePoint = 139 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); 140 largeQueueInitSize = 141 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); 142 smallQueueInitSize = 143 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); 144 largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); 145 smallFileQueue = largeFileQueue.getStealFromQueue(); 146 largeFileDeleteThreadNumber = 147 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); 148 smallFileDeleteThreadNumber = 149 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); 150 cleanerThreadTimeoutMsec = 151 conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); 152 cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, 153 DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); 154 startHFileDeleteThreads(); 155 } 156 157 @Override 158 protected boolean validate(Path file) { 159 return HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent()) || 160 StoreFileInfo.validateStoreFileName(file.getName()) || 161 file.getName().endsWith(MasterRegionFactory.ARCHIVED_HFILE_SUFFIX); 162 } 163 164 /** 165 * Exposed for TESTING! 166 */ 167 public List<BaseHFileCleanerDelegate> getDelegatesForTesting() { 168 return this.cleanersChain; 169 } 170 171 @Override 172 public int deleteFiles(Iterable<FileStatus> filesToDelete) { 173 int deletedFiles = 0; 174 List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>(); 175 // construct delete tasks and add into relative queue 176 for (FileStatus file : filesToDelete) { 177 HFileDeleteTask task = deleteFile(file); 178 if (task != null) { 179 tasks.add(task); 180 } 181 } 182 // wait for each submitted task to finish 183 for (HFileDeleteTask task : tasks) { 184 if (task.getResult(cleanerThreadCheckIntervalMsec)) { 185 deletedFiles++; 186 } 187 } 188 return deletedFiles; 189 } 190 191 /** 192 * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue 193 * @param file the file to delete 194 * @return HFileDeleteTask to track progress 195 */ 196 private HFileDeleteTask deleteFile(FileStatus file) { 197 HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec); 198 boolean enqueued = dispatch(task); 199 return enqueued ? task : null; 200 } 201 202 private boolean dispatch(HFileDeleteTask task) { 203 if (task.fileLength >= this.throttlePoint) { 204 if (!this.largeFileQueue.offer(task)) { 205 // should never arrive here as long as we use PriorityQueue 206 LOG.trace("Large file deletion queue is full"); 207 return false; 208 } 209 } else { 210 if (!this.smallFileQueue.offer(task)) { 211 // should never arrive here as long as we use PriorityQueue 212 LOG.trace("Small file deletion queue is full"); 213 return false; 214 } 215 } 216 return true; 217 } 218 219 @Override 220 public synchronized void cleanup() { 221 super.cleanup(); 222 stopHFileDeleteThreads(); 223 } 224 225 /** 226 * Start threads for hfile deletion 227 */ 228 private void startHFileDeleteThreads() { 229 final String n = Thread.currentThread().getName(); 230 running = true; 231 // start thread for large file deletion 232 for (int i = 0; i < largeFileDeleteThreadNumber; i++) { 233 Thread large = new Thread() { 234 @Override 235 public void run() { 236 consumerLoop(largeFileQueue); 237 } 238 }; 239 large.setDaemon(true); 240 large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis()); 241 large.start(); 242 LOG.debug("Starting for large file={}", large); 243 threads.add(large); 244 } 245 246 // start thread for small file deletion 247 for (int i = 0; i < smallFileDeleteThreadNumber; i++) { 248 Thread small = new Thread() { 249 @Override 250 public void run() { 251 consumerLoop(smallFileQueue); 252 } 253 }; 254 small.setDaemon(true); 255 small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis()); 256 small.start(); 257 LOG.debug("Starting for small files={}", small); 258 threads.add(small); 259 } 260 } 261 262 protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) { 263 try { 264 while (running) { 265 HFileDeleteTask task = null; 266 try { 267 task = queue.take(); 268 } catch (InterruptedException e) { 269 LOG.trace("Interrupted while trying to take a task from queue", e); 270 break; 271 } 272 if (task != null) { 273 LOG.trace("Removing {}", task.filePath); 274 boolean succeed; 275 try { 276 succeed = this.fs.delete(task.filePath, false); 277 } catch (IOException e) { 278 LOG.warn("Failed to delete {}", task.filePath, e); 279 succeed = false; 280 } 281 task.setResult(succeed); 282 if (succeed) { 283 countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue); 284 } 285 } 286 } 287 } finally { 288 LOG.debug("Exit {}", Thread.currentThread()); 289 } 290 } 291 292 // Currently only for testing purpose 293 private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) { 294 if (isLargeFile) { 295 if (deletedLargeFiles.get() == Long.MAX_VALUE) { 296 LOG.debug("Deleted more than Long.MAX_VALUE large files, reset counter to 0"); 297 deletedLargeFiles.set(0L); 298 } 299 deletedLargeFiles.incrementAndGet(); 300 } else { 301 if (deletedSmallFiles.get() == Long.MAX_VALUE) { 302 LOG.debug("Deleted more than Long.MAX_VALUE small files, reset counter to 0"); 303 deletedSmallFiles.set(0L); 304 } 305 if (fromLargeQueue) { 306 LOG.trace("Stolen a small file deletion task in large file thread"); 307 } 308 deletedSmallFiles.incrementAndGet(); 309 } 310 } 311 312 /** 313 * Stop threads for hfile deletion 314 */ 315 private void stopHFileDeleteThreads() { 316 running = false; 317 LOG.debug("Stopping file delete threads"); 318 for(Thread thread: threads){ 319 thread.interrupt(); 320 } 321 } 322 323 private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() { 324 325 @Override 326 public int compare(HFileDeleteTask o1, HFileDeleteTask o2) { 327 // larger file first so reverse compare 328 int cmp = Long.compare(o2.fileLength, o1.fileLength); 329 if (cmp != 0) { 330 return cmp; 331 } 332 // just use hashCode to generate a stable result. 333 return System.identityHashCode(o1) - System.identityHashCode(o2); 334 } 335 }; 336 337 private static final class HFileDeleteTask { 338 339 boolean done = false; 340 boolean result; 341 final Path filePath; 342 final long fileLength; 343 final long timeoutMsec; 344 345 public HFileDeleteTask(FileStatus file, long timeoutMsec) { 346 this.filePath = file.getPath(); 347 this.fileLength = file.getLen(); 348 this.timeoutMsec = timeoutMsec; 349 } 350 351 public synchronized void setResult(boolean result) { 352 this.done = true; 353 this.result = result; 354 notify(); 355 } 356 357 public synchronized boolean getResult(long waitIfNotFinished) { 358 long waitTimeMsec = 0; 359 try { 360 while (!done) { 361 long startTimeNanos = System.nanoTime(); 362 wait(waitIfNotFinished); 363 waitTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, 364 TimeUnit.NANOSECONDS); 365 if (done) { 366 return this.result; 367 } 368 if (waitTimeMsec > timeoutMsec) { 369 LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath 370 + ", exit..."); 371 return false; 372 } 373 } 374 } catch (InterruptedException e) { 375 LOG.warn("Interrupted while waiting for result of deleting " + filePath 376 + ", will return false", e); 377 return false; 378 } 379 return this.result; 380 } 381 } 382 383 public List<Thread> getCleanerThreads() { 384 return threads; 385 } 386 387 public long getNumOfDeletedLargeFiles() { 388 return deletedLargeFiles.get(); 389 } 390 391 public long getNumOfDeletedSmallFiles() { 392 return deletedSmallFiles.get(); 393 } 394 395 public long getLargeQueueInitSize() { 396 return largeQueueInitSize; 397 } 398 399 public long getSmallQueueInitSize() { 400 return smallQueueInitSize; 401 } 402 403 public long getThrottlePoint() { 404 return throttlePoint; 405 } 406 407 long getCleanerThreadTimeoutMsec() { 408 return cleanerThreadTimeoutMsec; 409 } 410 411 long getCleanerThreadCheckIntervalMsec() { 412 return cleanerThreadCheckIntervalMsec; 413 } 414 415 @Override 416 public void onConfigurationChange(Configuration conf) { 417 if (!checkAndUpdateConfigurations(conf)) { 418 LOG.debug("Update configuration triggered but nothing changed for this cleaner"); 419 return; 420 } 421 stopHFileDeleteThreads(); 422 // record the left over tasks 423 List<HFileDeleteTask> leftOverTasks = 424 new ArrayList<>(largeFileQueue.size() + smallFileQueue.size()); 425 leftOverTasks.addAll(largeFileQueue); 426 leftOverTasks.addAll(smallFileQueue); 427 largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); 428 smallFileQueue = largeFileQueue.getStealFromQueue(); 429 threads.clear(); 430 startHFileDeleteThreads(); 431 // re-dispatch the left over tasks 432 for (HFileDeleteTask task : leftOverTasks) { 433 dispatch(task); 434 } 435 } 436 437 /** 438 * Check new configuration and update settings if value changed 439 * @param conf The new configuration 440 * @return true if any configuration for HFileCleaner changes, false if no change 441 */ 442 private boolean checkAndUpdateConfigurations(Configuration conf) { 443 boolean updated = false; 444 int throttlePoint = 445 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); 446 if (throttlePoint != this.throttlePoint) { 447 LOG.debug("Updating throttle point, from {} to {}", this.throttlePoint, throttlePoint); 448 this.throttlePoint = throttlePoint; 449 updated = true; 450 } 451 int largeQueueInitSize = 452 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); 453 if (largeQueueInitSize != this.largeQueueInitSize) { 454 LOG.debug("Updating largeQueueInitSize, from {} to {}", this.largeQueueInitSize, 455 largeQueueInitSize); 456 this.largeQueueInitSize = largeQueueInitSize; 457 updated = true; 458 } 459 int smallQueueInitSize = 460 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); 461 if (smallQueueInitSize != this.smallQueueInitSize) { 462 LOG.debug("Updating smallQueueInitSize, from {} to {}", this.smallQueueInitSize, 463 smallQueueInitSize); 464 this.smallQueueInitSize = smallQueueInitSize; 465 updated = true; 466 } 467 int largeFileDeleteThreadNumber = 468 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); 469 if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) { 470 LOG.debug("Updating largeFileDeleteThreadNumber, from {} to {}", 471 this.largeFileDeleteThreadNumber, largeFileDeleteThreadNumber); 472 this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber; 473 updated = true; 474 } 475 int smallFileDeleteThreadNumber = 476 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); 477 if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) { 478 LOG.debug("Updating smallFileDeleteThreadNumber, from {} to {}", 479 this.smallFileDeleteThreadNumber, smallFileDeleteThreadNumber); 480 this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber; 481 updated = true; 482 } 483 long cleanerThreadTimeoutMsec = 484 conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); 485 if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) { 486 this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec; 487 updated = true; 488 } 489 long cleanerThreadCheckIntervalMsec = 490 conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, 491 DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); 492 if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) { 493 this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec; 494 updated = true; 495 } 496 return updated; 497 } 498 499 @Override 500 public synchronized void cancel(boolean mayInterruptIfRunning) { 501 super.cancel(mayInterruptIfRunning); 502 for (Thread t: this.threads) { 503 t.interrupt(); 504 } 505 } 506}