001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.cleaner; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Stoppable; 034import org.apache.hadoop.hbase.io.HFileLink; 035import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 036import org.apache.hadoop.hbase.util.StealJobQueue; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 041/** 042 * This Chore, every time it runs, will clear the HFiles in the hfile archive 043 * folder that are deletable for each HFile cleaner in the chain. 044 */ 045@InterfaceAudience.Private 046public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> { 047 048 public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins"; 049 050 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 051 Path directory) { 052 this(period, stopper, conf, fs, directory, null); 053 } 054 055 // Configuration key for large/small throttle point 056 public final static String HFILE_DELETE_THROTTLE_THRESHOLD = 057 "hbase.regionserver.thread.hfilecleaner.throttle"; 058 public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M 059 060 // Configuration key for large queue initial size 061 public final static String LARGE_HFILE_QUEUE_INIT_SIZE = 062 "hbase.regionserver.hfilecleaner.large.queue.size"; 063 public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240; 064 065 // Configuration key for small queue initial size 066 public final static String SMALL_HFILE_QUEUE_INIT_SIZE = 067 "hbase.regionserver.hfilecleaner.small.queue.size"; 068 public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240; 069 070 // Configuration key for large file delete thread number 071 public final static String LARGE_HFILE_DELETE_THREAD_NUMBER = 072 "hbase.regionserver.hfilecleaner.large.thread.count"; 073 public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1; 074 075 // Configuration key for small file delete thread number 076 public final static String SMALL_HFILE_DELETE_THREAD_NUMBER = 077 "hbase.regionserver.hfilecleaner.small.thread.count"; 078 public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1; 079 080 public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC = 081 "hbase.regionserver.hfilecleaner.thread.timeout.msec"; 082 @VisibleForTesting 083 static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L; 084 085 public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 086 "hbase.regionserver.hfilecleaner.thread.check.interval.msec"; 087 @VisibleForTesting 088 static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L; 089 090 private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class); 091 092 StealJobQueue<HFileDeleteTask> largeFileQueue; 093 BlockingQueue<HFileDeleteTask> smallFileQueue; 094 private int throttlePoint; 095 private int largeQueueInitSize; 096 private int smallQueueInitSize; 097 private int largeFileDeleteThreadNumber; 098 private int smallFileDeleteThreadNumber; 099 private long cleanerThreadTimeoutMsec; 100 private long cleanerThreadCheckIntervalMsec; 101 private List<Thread> threads = new ArrayList<Thread>(); 102 private boolean running; 103 104 private AtomicLong deletedLargeFiles = new AtomicLong(); 105 private AtomicLong deletedSmallFiles = new AtomicLong(); 106 107 /** 108 * @param period the period of time to sleep between each run 109 * @param stopper the stopper 110 * @param conf configuration to use 111 * @param fs handle to the FS 112 * @param directory directory to be cleaned 113 * @param params params could be used in subclass of BaseHFileCleanerDelegate 114 */ 115 public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, 116 Path directory, Map<String, Object> params) { 117 super("HFileCleaner", period, stopper, conf, fs, 118 directory, MASTER_HFILE_CLEANER_PLUGINS, params); 119 throttlePoint = 120 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); 121 largeQueueInitSize = 122 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); 123 smallQueueInitSize = 124 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); 125 largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); 126 smallFileQueue = largeFileQueue.getStealFromQueue(); 127 largeFileDeleteThreadNumber = 128 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); 129 smallFileDeleteThreadNumber = 130 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); 131 cleanerThreadTimeoutMsec = 132 conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); 133 cleanerThreadCheckIntervalMsec = 134 conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, 135 DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); 136 startHFileDeleteThreads(); 137 } 138 139 @Override 140 protected boolean validate(Path file) { 141 if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) { 142 return true; 143 } 144 return StoreFileInfo.validateStoreFileName(file.getName()); 145 } 146 147 /** 148 * Exposed for TESTING! 149 */ 150 public List<BaseHFileCleanerDelegate> getDelegatesForTesting() { 151 return this.cleanersChain; 152 } 153 154 @Override 155 public int deleteFiles(Iterable<FileStatus> filesToDelete) { 156 int deletedFiles = 0; 157 List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>(); 158 // construct delete tasks and add into relative queue 159 for (FileStatus file : filesToDelete) { 160 HFileDeleteTask task = deleteFile(file); 161 if (task != null) { 162 tasks.add(task); 163 } 164 } 165 // wait for each submitted task to finish 166 for (HFileDeleteTask task : tasks) { 167 if (task.getResult(cleanerThreadCheckIntervalMsec)) { 168 deletedFiles++; 169 } 170 } 171 return deletedFiles; 172 } 173 174 /** 175 * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue 176 * @param file the file to delete 177 * @return HFileDeleteTask to track progress 178 */ 179 private HFileDeleteTask deleteFile(FileStatus file) { 180 HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec); 181 boolean enqueued = dispatch(task); 182 return enqueued ? task : null; 183 } 184 185 private boolean dispatch(HFileDeleteTask task) { 186 if (task.fileLength >= this.throttlePoint) { 187 if (!this.largeFileQueue.offer(task)) { 188 // should never arrive here as long as we use PriorityQueue 189 LOG.trace("Large file deletion queue is full"); 190 return false; 191 } 192 } else { 193 if (!this.smallFileQueue.offer(task)) { 194 // should never arrive here as long as we use PriorityQueue 195 LOG.trace("Small file deletion queue is full"); 196 return false; 197 } 198 } 199 return true; 200 } 201 202 @Override 203 public synchronized void cleanup() { 204 super.cleanup(); 205 stopHFileDeleteThreads(); 206 } 207 208 /** 209 * Start threads for hfile deletion 210 */ 211 private void startHFileDeleteThreads() { 212 final String n = Thread.currentThread().getName(); 213 running = true; 214 // start thread for large file deletion 215 for (int i = 0; i < largeFileDeleteThreadNumber; i++) { 216 Thread large = new Thread() { 217 @Override 218 public void run() { 219 consumerLoop(largeFileQueue); 220 } 221 }; 222 large.setDaemon(true); 223 large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis()); 224 large.start(); 225 LOG.debug("Starting for large file={}", large); 226 threads.add(large); 227 } 228 229 // start thread for small file deletion 230 for (int i = 0; i < smallFileDeleteThreadNumber; i++) { 231 Thread small = new Thread() { 232 @Override 233 public void run() { 234 consumerLoop(smallFileQueue); 235 } 236 }; 237 small.setDaemon(true); 238 small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis()); 239 small.start(); 240 LOG.debug("Starting for small files={}", small); 241 threads.add(small); 242 } 243 } 244 245 protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) { 246 try { 247 while (running) { 248 HFileDeleteTask task = null; 249 try { 250 task = queue.take(); 251 } catch (InterruptedException e) { 252 LOG.trace("Interrupted while trying to take a task from queue", e); 253 break; 254 } 255 if (task != null) { 256 LOG.trace("Removing {}", task.filePath); 257 boolean succeed; 258 try { 259 succeed = this.fs.delete(task.filePath, false); 260 } catch (IOException e) { 261 LOG.warn("Failed to delete {}", task.filePath, e); 262 succeed = false; 263 } 264 task.setResult(succeed); 265 if (succeed) { 266 countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue); 267 } 268 } 269 } 270 } finally { 271 LOG.debug("Exit {}", Thread.currentThread()); 272 } 273 } 274 275 // Currently only for testing purpose 276 private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) { 277 if (isLargeFile) { 278 if (deletedLargeFiles.get() == Long.MAX_VALUE) { 279 LOG.debug("Deleted more than Long.MAX_VALUE large files, reset counter to 0"); 280 deletedLargeFiles.set(0L); 281 } 282 deletedLargeFiles.incrementAndGet(); 283 } else { 284 if (deletedSmallFiles.get() == Long.MAX_VALUE) { 285 LOG.debug("Deleted more than Long.MAX_VALUE small files, reset counter to 0"); 286 deletedSmallFiles.set(0L); 287 } 288 if (fromLargeQueue) { 289 LOG.trace("Stolen a small file deletion task in large file thread"); 290 } 291 deletedSmallFiles.incrementAndGet(); 292 } 293 } 294 295 /** 296 * Stop threads for hfile deletion 297 */ 298 private void stopHFileDeleteThreads() { 299 running = false; 300 LOG.debug("Stopping file delete threads"); 301 for(Thread thread: threads){ 302 thread.interrupt(); 303 } 304 } 305 306 private static final Comparator<HFileDeleteTask> COMPARATOR = new Comparator<HFileDeleteTask>() { 307 308 @Override 309 public int compare(HFileDeleteTask o1, HFileDeleteTask o2) { 310 // larger file first so reverse compare 311 int cmp = Long.compare(o2.fileLength, o1.fileLength); 312 if (cmp != 0) { 313 return cmp; 314 } 315 // just use hashCode to generate a stable result. 316 return System.identityHashCode(o1) - System.identityHashCode(o2); 317 } 318 }; 319 320 private static final class HFileDeleteTask { 321 322 boolean done = false; 323 boolean result; 324 final Path filePath; 325 final long fileLength; 326 final long timeoutMsec; 327 328 public HFileDeleteTask(FileStatus file, long timeoutMsec) { 329 this.filePath = file.getPath(); 330 this.fileLength = file.getLen(); 331 this.timeoutMsec = timeoutMsec; 332 } 333 334 public synchronized void setResult(boolean result) { 335 this.done = true; 336 this.result = result; 337 notify(); 338 } 339 340 public synchronized boolean getResult(long waitIfNotFinished) { 341 long waitTimeMsec = 0; 342 try { 343 while (!done) { 344 long startTimeNanos = System.nanoTime(); 345 wait(waitIfNotFinished); 346 waitTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, 347 TimeUnit.NANOSECONDS); 348 if (done) { 349 return this.result; 350 } 351 if (waitTimeMsec > timeoutMsec) { 352 LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath 353 + ", exit..."); 354 return false; 355 } 356 } 357 } catch (InterruptedException e) { 358 LOG.warn("Interrupted while waiting for result of deleting " + filePath 359 + ", will return false", e); 360 return false; 361 } 362 return this.result; 363 } 364 } 365 366 @VisibleForTesting 367 public List<Thread> getCleanerThreads() { 368 return threads; 369 } 370 371 @VisibleForTesting 372 public long getNumOfDeletedLargeFiles() { 373 return deletedLargeFiles.get(); 374 } 375 376 @VisibleForTesting 377 public long getNumOfDeletedSmallFiles() { 378 return deletedSmallFiles.get(); 379 } 380 381 @VisibleForTesting 382 public long getLargeQueueInitSize() { 383 return largeQueueInitSize; 384 } 385 386 @VisibleForTesting 387 public long getSmallQueueInitSize() { 388 return smallQueueInitSize; 389 } 390 391 @VisibleForTesting 392 public long getThrottlePoint() { 393 return throttlePoint; 394 } 395 396 @VisibleForTesting 397 long getCleanerThreadTimeoutMsec() { 398 return cleanerThreadTimeoutMsec; 399 } 400 401 @VisibleForTesting 402 long getCleanerThreadCheckIntervalMsec() { 403 return cleanerThreadCheckIntervalMsec; 404 } 405 406 @Override 407 public void onConfigurationChange(Configuration conf) { 408 super.onConfigurationChange(conf); 409 410 if (!checkAndUpdateConfigurations(conf)) { 411 LOG.debug("Update configuration triggered but nothing changed for this cleaner"); 412 return; 413 } 414 stopHFileDeleteThreads(); 415 // record the left over tasks 416 List<HFileDeleteTask> leftOverTasks = 417 new ArrayList<>(largeFileQueue.size() + smallFileQueue.size()); 418 leftOverTasks.addAll(largeFileQueue); 419 leftOverTasks.addAll(smallFileQueue); 420 largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR); 421 smallFileQueue = largeFileQueue.getStealFromQueue(); 422 threads.clear(); 423 startHFileDeleteThreads(); 424 // re-dispatch the left over tasks 425 for (HFileDeleteTask task : leftOverTasks) { 426 dispatch(task); 427 } 428 } 429 430 /** 431 * Check new configuration and update settings if value changed 432 * @param conf The new configuration 433 * @return true if any configuration for HFileCleaner changes, false if no change 434 */ 435 private boolean checkAndUpdateConfigurations(Configuration conf) { 436 boolean updated = false; 437 int throttlePoint = 438 conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); 439 if (throttlePoint != this.throttlePoint) { 440 LOG.debug("Updating throttle point, from {} to {}", this.throttlePoint, throttlePoint); 441 this.throttlePoint = throttlePoint; 442 updated = true; 443 } 444 int largeQueueInitSize = 445 conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); 446 if (largeQueueInitSize != this.largeQueueInitSize) { 447 LOG.debug("Updating largeQueueInitSize, from {} to {}", this.largeQueueInitSize, 448 largeQueueInitSize); 449 this.largeQueueInitSize = largeQueueInitSize; 450 updated = true; 451 } 452 int smallQueueInitSize = 453 conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); 454 if (smallQueueInitSize != this.smallQueueInitSize) { 455 LOG.debug("Updating smallQueueInitSize, from {} to {}", this.smallQueueInitSize, 456 smallQueueInitSize); 457 this.smallQueueInitSize = smallQueueInitSize; 458 updated = true; 459 } 460 int largeFileDeleteThreadNumber = 461 conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); 462 if (largeFileDeleteThreadNumber != this.largeFileDeleteThreadNumber) { 463 LOG.debug("Updating largeFileDeleteThreadNumber, from {} to {}", 464 this.largeFileDeleteThreadNumber, largeFileDeleteThreadNumber); 465 this.largeFileDeleteThreadNumber = largeFileDeleteThreadNumber; 466 updated = true; 467 } 468 int smallFileDeleteThreadNumber = 469 conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); 470 if (smallFileDeleteThreadNumber != this.smallFileDeleteThreadNumber) { 471 LOG.debug("Updating smallFileDeleteThreadNumber, from {} to {}", 472 this.smallFileDeleteThreadNumber, smallFileDeleteThreadNumber); 473 this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber; 474 updated = true; 475 } 476 long cleanerThreadTimeoutMsec = 477 conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); 478 if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) { 479 this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec; 480 updated = true; 481 } 482 long cleanerThreadCheckIntervalMsec = 483 conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, 484 DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); 485 if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) { 486 this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec; 487 updated = true; 488 } 489 return updated; 490 } 491 492 @Override 493 public synchronized void cancel(boolean mayInterruptIfRunning) { 494 super.cancel(mayInterruptIfRunning); 495 for (Thread t: this.threads) { 496 t.interrupt(); 497 } 498 } 499}