001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileStatus; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Stoppable; 033import org.apache.hadoop.hbase.conf.ConfigurationObserver; 034import org.apache.hadoop.hbase.io.HFileLink; 035import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 036import org.apache.hadoop.hbase.util.StealJobQueue; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 042/** 043 * This Chore, every time it runs, will clear the HFiles in the hfile archive 044 * folder that are deletable for each HFile cleaner in the chain. 045 */ 046@InterfaceAudience.Private 047public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> 048 implements ConfigurationObserver { 049 050 public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins"; 051 052 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 053 Path directory, DirScanPool pool) { 054 this(period, stopper, conf, fs, directory, pool, null); 055 } 056 057 // Configuration key for large/small throttle point 058 public final static String HFILE_DELETE_THROTTLE_THRESHOLD = 059 "hbase.regionserver.thread.hfilecleaner.throttle"; 060 public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M 061 062 // Configuration key for large queue initial size 063 public final static String LARGE_HFILE_QUEUE_INIT_SIZE = 064 "hbase.regionserver.hfilecleaner.large.queue.size"; 065 public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240; 066 067 // Configuration key for small queue initial size 068 public final static String SMALL_HFILE_QUEUE_INIT_SIZE = 069 "hbase.regionserver.hfilecleaner.small.queue.size"; 070 public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240; 071 072 // Configuration key for large file delete thread number 073 public final static String LARGE_HFILE_DELETE_THREAD_NUMBER = 074 "hbase.regionserver.hfilecleaner.large.thread.count"; 075 public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1; 076 077 // Configuration key for small file delete thread number 078 public final static String SMALL_HFILE_DELETE_THREAD_NUMBER = 079 "hbase.regionserver.hfilecleaner.small.thread.count"; 080 public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1; 081 082 public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC = 083 "hbase.regionserver.hfilecleaner.thread.timeout.msec"; 084 @VisibleForTesting 085 static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L; 086 087 public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 088 "hbase.regionserver.hfilecleaner.thread.check.interval.msec"; 089 @VisibleForTesting 090 static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L; 091 092 private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class); 093 094 StealJobQueue<HFileDeleteTask> largeFileQueue; 095 BlockingQueue<HFileDeleteTask> smallFileQueue; 096 private int throttlePoint; 097 private int largeQueueInitSize; 098 private int smallQueueInitSize; 099 private int largeFileDeleteThreadNumber; 100 private int smallFileDeleteThreadNumber; 101 private long cleanerThreadTimeoutMsec; 102 private long cleanerThreadCheckIntervalMsec; 103 private List<Thread> threads = new ArrayList<Thread>(); 104 private boolean running; 105 106 private AtomicLong deletedLargeFiles = new AtomicLong(); 107 private AtomicLong deletedSmallFiles = new AtomicLong(); 108 109 /** 110 * @param period the period of time to sleep between each run 111 * @param stopper the stopper 112 * @param conf configuration to use 113 * @param fs handle to the FS 114 * @param directory directory to be cleaned 115 * @param pool the thread pool used to scan directories 116 * @param params params could be used in subclass of BaseHFileCleanerDelegate 117 */ 118 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 119 Path directory, DirScanPool pool, Map<String, Object> params) { 120 super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool, 121 params); 122 throttlePoint = 123 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); 124 largeQueueInitSize = 125 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); 126 smallQueueInitSize = 127 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); 128 largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); 129 smallFileQueue = largeFileQueue.getStealFromQueue(); 130 largeFileDeleteThreadNumber = 131 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); 132 smallFileDeleteThreadNumber = 133 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); 134 cleanerThreadTimeoutMsec = 135 conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); 136 cleanerThreadCheckIntervalMsec = 137 conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, 138 DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); 139 startHFileDeleteThreads(); 140 } 141 142 @Override 143 protected boolean validate(Path file) { 144 if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) { 145 return true; 146 } 147 return StoreFileInfo.validateStoreFileName(file.getName()); 148 } 149 150 /** 151 * Exposed for TESTING! 152 */ 153 public List<BaseHFileCleanerDelegate> getDelegatesForTesting() { 154 return this.cleanersChain; 155 } 156 157 @Override 158 public int deleteFiles(Iterable<FileStatus> filesToDelete) { 159 int deletedFiles = 0; 160 List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>(); 161 // construct delete tasks and add into relative queue 162 for (FileStatus file : filesToDelete) { 163 HFileDeleteTask task = deleteFile(file); 164 if (task != null) { 165 tasks.add(task); 166 } 167 } 168 // wait for each submitted task to finish 169 for (HFileDeleteTask task : tasks) { 170 if (task.getResult(cleanerThreadCheckIntervalMsec)) { 171 deletedFiles++; 172 } 173 } 174 return deletedFiles; 175 } 176 177 /** 178 * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue 179 * @param file the file to delete 180 * @return HFileDeleteTask to track progress 181 */ 182 private HFileDeleteTask deleteFile(FileStatus file) { 183 HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec); 184 boolean enqueued = dispatch(task); 185 return enqueued ? task : null; 186 } 187 188 private boolean dispatch(HFileDeleteTask task) { 189 if (task.fileLength >= this.throttlePoint) { 190 if (!this.largeFileQueue.offer(task)) { 191 // should never arrive here as long as we use PriorityQueue 192 LOG.trace("Large file deletion queue is full"); 193 return false; 194 } 195 } else { 196 if (!this.smallFileQueue.offer(task)) { 197 // should never arrive here as long as we use PriorityQueue 198 LOG.trace("Small file deletion queue is full"); 199 return false; 200 } 201 } 202 return true; 203 } 204 205 @Override 206 public synchronized void cleanup() { 207 super.cleanup(); 208 stopHFileDeleteThreads(); 209 } 210 211 /** 212 * Start threads for hfile deletion 213 */ 214 private void startHFileDeleteThreads() { 215 final String n = Thread.currentThread().getName(); 216 running = true; 217 // start thread for large file deletion 218 for (int i = 0; i < largeFileDeleteThreadNumber; i++) { 219 Thread large = new Thread() { 220 @Override 221 public void run() { 222 consumerLoop(largeFileQueue); 223 } 224 }; 225 large.setDaemon(true); 226 large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis()); 227 large.start(); 228 LOG.debug("Starting for large file={}", large); 229 threads.add(large); 230 } 231 232 // start thread for small file deletion 233 for (int i = 0; i < smallFileDeleteThreadNumber; i++) { 234 Thread small = new Thread() { 235 @Override 236 public void run() { 237 consumerLoop(smallFileQueue); 238 } 239 }; 240 small.setDaemon(true); 241 small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis()); 242 small.start(); 243 LOG.debug("Starting for small files={}", small); 244 threads.add(small); 245 } 246 } 247 248 protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) { 249 try { 250 while (running) { 251 HFileDeleteTask task = null; 252 try { 253 task = queue.take(); 254 } catch (InterruptedException e) { 255 LOG.trace("Interrupted while trying to take a task from queue", e); 256 break; 257 } 258 if (task != null) { 259 LOG.trace("Removing {}", task.filePath); 260 boolean succeed; 261 try { 262 succeed = this.fs.delete(task.filePath, false); 263 } catch (IOException e) { 264 LOG.warn("Failed to delete {}", task.filePath, e); 265 succeed = false; 266 } 267 task.setResult(succeed); 268 if (succeed) { 269 countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue); 270 } 271 } 272 } 273 } finally { 274 LOG.debug("Exit {}", Thread.currentThread()); 275 } 276 } 277 278 // Currently only for testing purpose 279 private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) { 280 if (isLargeFile) { 281 if (deletedLargeFiles.get() == Long.MAX_VALUE) { 282 LOG.debug("Deleted more than Long.MAX_VALUE large files, reset counter to 0"); 283 deletedLargeFiles.set(0L); 284 } 285 deletedLargeFiles.incrementAndGet(); 286 } else { 287 if (deletedSmallFiles.get() == Long.MAX_VALUE) { 288 LOG.debug("Deleted more than Long.MAX_VALUE small files, reset counter to 0"); 289 deletedSmallFiles.set(0L); 290 } 291 if (fromLargeQueue) { 292 LOG.trace("Stolen a small file deletion task in large file thread"); 293 } 294 deletedSmallFiles.incrementAndGet(); 295 } 296 } 297 298 /** 299 * Stop threads for hfile deletion 300 */ 301 private void stopHFileDeleteThreads() { 302 running = false; 303 LOG.debug("Stopping file delete threads"); 304 for(Thread thread: threads){ 305 thread.interrupt(); 306 } 307 } 308 309 private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() { 310 311 @Override 312 public int compare(HFileDeleteTask o1, HFileDeleteTask o2) { 313 // larger file first so reverse compare 314 int cmp = Long.compare(o2.fileLength, o1.fileLength); 315 if (cmp != 0) { 316 return cmp; 317 } 318 // just use hashCode to generate a stable result. 319 return System.identityHashCode(o1) - System.identityHashCode(o2); 320 } 321 }; 322 323 private static final class HFileDeleteTask { 324 325 boolean done = false; 326 boolean result; 327 final Path filePath; 328 final long fileLength; 329 final long timeoutMsec; 330 331 public HFileDeleteTask(FileStatus file, long timeoutMsec) { 332 this.filePath = file.getPath(); 333 this.fileLength = file.getLen(); 334 this.timeoutMsec = timeoutMsec; 335 } 336 337 public synchronized void setResult(boolean result) { 338 this.done = true; 339 this.result = result; 340 notify(); 341 } 342 343 public synchronized boolean getResult(long waitIfNotFinished) { 344 long waitTimeMsec = 0; 345 try { 346 while (!done) { 347 long startTimeNanos = System.nanoTime(); 348 wait(waitIfNotFinished); 349 waitTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, 350 TimeUnit.NANOSECONDS); 351 if (done) { 352 return this.result; 353 } 354 if (waitTimeMsec > timeoutMsec) { 355 LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath 356 + ", exit..."); 357 return false; 358 } 359 } 360 } catch (InterruptedException e) { 361 LOG.warn("Interrupted while waiting for result of deleting " + filePath 362 + ", will return false", e); 363 return false; 364 } 365 return this.result; 366 } 367 } 368 369 @VisibleForTesting 370 public List<Thread> getCleanerThreads() { 371 return threads; 372 } 373 374 @VisibleForTesting 375 public long getNumOfDeletedLargeFiles() { 376 return deletedLargeFiles.get(); 377 } 378 379 @VisibleForTesting 380 public long getNumOfDeletedSmallFiles() { 381 return deletedSmallFiles.get(); 382 } 383 384 @VisibleForTesting 385 public long getLargeQueueInitSize() { 386 return largeQueueInitSize; 387 } 388 389 @VisibleForTesting 390 public long getSmallQueueInitSize() { 391 return smallQueueInitSize; 392 } 393 394 @VisibleForTesting 395 public long getThrottlePoint() { 396 return throttlePoint; 397 } 398 399 @VisibleForTesting 400 long getCleanerThreadTimeoutMsec() { 401 return cleanerThreadTimeoutMsec; 402 } 403 404 @VisibleForTesting 405 long getCleanerThreadCheckIntervalMsec() { 406 return cleanerThreadCheckIntervalMsec; 407 } 408 409 @Override 410 public void onConfigurationChange(Configuration conf) { 411 if (!checkAndUpdateConfigurations(conf)) { 412 LOG.debug("Update configuration triggered but nothing changed for this cleaner"); 413 return; 414 } 415 stopHFileDeleteThreads(); 416 // record the left over tasks 417 List<HFileDeleteTask> leftOverTasks = 418 new ArrayList<>(largeFileQueue.size() + smallFileQueue.size()); 419 leftOverTasks.addAll(largeFileQueue); 420 leftOverTasks.addAll(smallFileQueue); 421 largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); 422 smallFileQueue = largeFileQueue.getStealFromQueue(); 423 threads.clear(); 424 startHFileDeleteThreads(); 425 // re-dispatch the left over tasks 426 for (HFileDeleteTask task : leftOverTasks) { 427 dispatch(task); 428 } 429 } 430 431 /** 432 * Check new configuration and update settings if value changed 433 * @param conf The new configuration 434 * @return true if any configuration for HFileCleaner changes, false if no change 435 */ 436 private boolean checkAndUpdateConfigurations(Configuration conf) { 437 boolean updated = false; 438 int throttlePoint = 439 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); 440 if (throttlePoint != this.throttlePoint) { 441 LOG.debug("Updating throttle point, from {} to {}", this.throttlePoint, throttlePoint); 442 this.throttlePoint = throttlePoint; 443 updated = true; 444 } 445 int largeQueueInitSize = 446 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); 447 if (largeQueueInitSize != this.largeQueueInitSize) { 448 LOG.debug("Updating largeQueueInitSize, from {} to {}", this.largeQueueInitSize, 449 largeQueueInitSize); 450 this.largeQueueInitSize = largeQueueInitSize; 451 updated = true; 452 } 453 int smallQueueInitSize = 454 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); 455 if (smallQueueInitSize != this.smallQueueInitSize) { 456 LOG.debug("Updating smallQueueInitSize, from {} to {}", this.smallQueueInitSize, 457 smallQueueInitSize); 458 this.smallQueueInitSize = smallQueueInitSize; 459 updated = true; 460 } 461 int largeFileDeleteThreadNumber = 462 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); 463 if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) { 464 LOG.debug("Updating largeFileDeleteThreadNumber, from {} to {}", 465 this.largeFileDeleteThreadNumber, largeFileDeleteThreadNumber); 466 this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber; 467 updated = true; 468 } 469 int smallFileDeleteThreadNumber = 470 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); 471 if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) { 472 LOG.debug("Updating smallFileDeleteThreadNumber, from {} to {}", 473 this.smallFileDeleteThreadNumber, smallFileDeleteThreadNumber); 474 this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber; 475 updated = true; 476 } 477 long cleanerThreadTimeoutMsec = 478 conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); 479 if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) { 480 this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec; 481 updated = true; 482 } 483 long cleanerThreadCheckIntervalMsec = 484 conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, 485 DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); 486 if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) { 487 this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec; 488 updated = true; 489 } 490 return updated; 491 } 492 493 @Override 494 public synchronized void cancel(boolean mayInterruptIfRunning) { 495 super.cancel(mayInterruptIfRunning); 496 for (Thread t: this.threads) { 497 t.interrupt(); 498 } 499 } 500}