001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.TimeUnit;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileStatus;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.conf.ConfigurationObserver;
034import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
041
042/**
043 * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old
044 * logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise.
045 * @see BaseLogCleanerDelegate
046 */
047@InterfaceAudience.Private
048public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
049  implements ConfigurationObserver {
050
051  private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName());
052
053  public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size";
054  public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2;
055
056  public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC =
057      "hbase.oldwals.cleaner.thread.timeout.msec";
058  @VisibleForTesting
059  static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L;
060
061  public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC =
062      "hbase.oldwals.cleaner.thread.check.interval.msec";
063  @VisibleForTesting
064  static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500L;
065
066
067  private final LinkedBlockingQueue<CleanerContext> pendingDelete;
068  private List<Thread> oldWALsCleaner;
069  private long cleanerThreadTimeoutMsec;
070  private long cleanerThreadCheckIntervalMsec;
071
072  /**
073   * @param period the period of time to sleep between each run
074   * @param stopper the stopper
075   * @param conf configuration to use
076   * @param fs handle to the FS
077   * @param oldLogDir the path to the archived logs
078   * @param pool the thread pool used to scan directories
079   */
080  public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
081    Path oldLogDir, DirScanPool pool) {
082    super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
083      pool);
084    this.pendingDelete = new LinkedBlockingQueue<>();
085    int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
086    this.oldWALsCleaner = createOldWalsCleaner(size);
087    this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
088      DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
089    this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
090      DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
091  }
092
093  @Override
094  protected boolean validate(Path file) {
095    return AbstractFSWALProvider.validateWALFilename(file.getName())
096        || MasterProcedureUtil.validateProcedureWALFilename(file.getName());
097  }
098
099  @Override
100  public void onConfigurationChange(Configuration conf) {
101    int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
102    if (newSize == oldWALsCleaner.size()) {
103      if (LOG.isDebugEnabled()) {
104        LOG.debug("Size from configuration is the same as previous which is " +
105          newSize + ", no need to update.");
106      }
107      return;
108    }
109    interruptOldWALsCleaner();
110    oldWALsCleaner = createOldWalsCleaner(newSize);
111    cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
112        DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
113    cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
114        DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
115  }
116
117  @Override
118  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
119    List<CleanerContext> results = new LinkedList<>();
120    for (FileStatus toDelete : filesToDelete) {
121      CleanerContext context = CleanerContext.createCleanerContext(toDelete,
122          cleanerThreadTimeoutMsec);
123      if (context != null) {
124        pendingDelete.add(context);
125        results.add(context);
126      }
127    }
128
129    int deletedFiles = 0;
130    for (CleanerContext res : results) {
131      deletedFiles += res.getResult(cleanerThreadCheckIntervalMsec) ? 1 : 0;
132    }
133    return deletedFiles;
134  }
135
136  @Override
137  public synchronized void cleanup() {
138    super.cleanup();
139    interruptOldWALsCleaner();
140  }
141
142  @VisibleForTesting
143  int getSizeOfCleaners() {
144    return oldWALsCleaner.size();
145  }
146
147  @VisibleForTesting
148  long getCleanerThreadTimeoutMsec() {
149    return cleanerThreadTimeoutMsec;
150  }
151
152  @VisibleForTesting
153  long getCleanerThreadCheckIntervalMsec() {
154    return cleanerThreadCheckIntervalMsec;
155  }
156
157  private List<Thread> createOldWalsCleaner(int size) {
158    LOG.info("Creating OldWALs cleaners with size=" + size);
159
160    List<Thread> oldWALsCleaner = new ArrayList<>(size);
161    for (int i = 0; i < size; i++) {
162      Thread cleaner = new Thread(() -> deleteFile());
163      cleaner.setName("OldWALsCleaner-" + i);
164      cleaner.setDaemon(true);
165      cleaner.start();
166      oldWALsCleaner.add(cleaner);
167    }
168    return oldWALsCleaner;
169  }
170
171  private void interruptOldWALsCleaner() {
172    for (Thread cleaner : oldWALsCleaner) {
173      cleaner.interrupt();
174    }
175    oldWALsCleaner.clear();
176  }
177
178  private void deleteFile() {
179    while (true) {
180      CleanerContext context = null;
181      boolean succeed = false;
182      boolean interrupted = false;
183      try {
184        context = pendingDelete.take();
185        if (context != null) {
186          FileStatus toClean = context.getTargetToClean();
187          succeed = this.fs.delete(toClean.getPath(), false);
188        }
189      } catch (InterruptedException ite) {
190        // It's most likely from configuration changing request
191        if (context != null) {
192          LOG.warn("Interrupted while cleaning oldWALs " +
193              context.getTargetToClean() + ", try to clean it next round.");
194        }
195        interrupted = true;
196      } catch (IOException e) {
197        // fs.delete() fails.
198        LOG.warn("Failed to clean oldwals with exception: " + e);
199        succeed = false;
200      } finally {
201        if (context != null) {
202          context.setResult(succeed);
203        }
204        if (interrupted) {
205          // Restore interrupt status
206          Thread.currentThread().interrupt();
207          break;
208        }
209      }
210    }
211    if (LOG.isDebugEnabled()) {
212      LOG.debug("Exiting cleaner.");
213    }
214  }
215
216  @Override
217  public synchronized void cancel(boolean mayInterruptIfRunning) {
218    super.cancel(mayInterruptIfRunning);
219    for (Thread t : oldWALsCleaner) {
220      t.interrupt();
221    }
222  }
223
224  private static final class CleanerContext {
225
226    final FileStatus target;
227    volatile boolean result;
228    volatile boolean setFromCleaner = false;
229    long timeoutMsec;
230
231    static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) {
232      return status != null ? new CleanerContext(status, timeoutMsec) : null;
233    }
234
235    private CleanerContext(FileStatus status, long timeoutMsec) {
236      this.target = status;
237      this.result = false;
238      this.timeoutMsec = timeoutMsec;
239    }
240
241    synchronized void setResult(boolean res) {
242      this.result = res;
243      this.setFromCleaner = true;
244      notify();
245    }
246
247    synchronized boolean getResult(long waitIfNotFinished) {
248      long totalTimeMsec = 0;
249      try {
250        while (!setFromCleaner) {
251          long startTimeNanos = System.nanoTime();
252          wait(waitIfNotFinished);
253          totalTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos,
254              TimeUnit.NANOSECONDS);
255          if (totalTimeMsec >= timeoutMsec) {
256            LOG.warn("Spend too much time " + totalTimeMsec + " ms to delete oldwals " + target);
257            return result;
258          }
259        }
260      } catch (InterruptedException e) {
261        LOG.warn("Interrupted while waiting deletion of " + target);
262        return result;
263      }
264      return result;
265    }
266
267    FileStatus getTargetToClean() {
268      return target;
269    }
270  }
271}