001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Stoppable;
034import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
035import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
040
041/**
042 * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old
043 * logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise.
044 * @see BaseLogCleanerDelegate
045 */
046@InterfaceAudience.Private
047public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
048  private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName());
049
050  public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size";
051  public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2;
052
053  public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC =
054      "hbase.oldwals.cleaner.thread.timeout.msec";
055  @VisibleForTesting
056  static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L;
057
058  public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC =
059      "hbase.oldwals.cleaner.thread.check.interval.msec";
060  @VisibleForTesting
061  static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500L;
062
063
064  private final LinkedBlockingQueue<CleanerContext> pendingDelete;
065  private List<Thread> oldWALsCleaner;
066  private long cleanerThreadTimeoutMsec;
067  private long cleanerThreadCheckIntervalMsec;
068
069  /**
070   * @param period the period of time to sleep between each run
071   * @param stopper the stopper
072   * @param conf configuration to use
073   * @param fs handle to the FS
074   * @param oldLogDir the path to the archived logs
075   */
076  public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
077      Path oldLogDir) {
078    super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS);
079    this.pendingDelete = new LinkedBlockingQueue<>();
080    int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
081    this.oldWALsCleaner = createOldWalsCleaner(size);
082    this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
083        DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
084    this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
085        DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
086  }
087
088  @Override
089  protected boolean validate(Path file) {
090    return AbstractFSWALProvider.validateWALFilename(file.getName())
091        || MasterProcedureUtil.validateProcedureWALFilename(file.getName());
092  }
093
094  @Override
095  public void onConfigurationChange(Configuration conf) {
096    super.onConfigurationChange(conf);
097
098    int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
099    if (newSize == oldWALsCleaner.size()) {
100      if (LOG.isDebugEnabled()) {
101        LOG.debug("Size from configuration is the same as previous which is " +
102          newSize + ", no need to update.");
103      }
104      return;
105    }
106    interruptOldWALsCleaner();
107    oldWALsCleaner = createOldWalsCleaner(newSize);
108    cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
109        DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
110    cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
111        DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
112  }
113
114  @Override
115  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
116    List<CleanerContext> results = new LinkedList<>();
117    for (FileStatus toDelete : filesToDelete) {
118      CleanerContext context = CleanerContext.createCleanerContext(toDelete,
119          cleanerThreadTimeoutMsec);
120      if (context != null) {
121        pendingDelete.add(context);
122        results.add(context);
123      }
124    }
125
126    int deletedFiles = 0;
127    for (CleanerContext res : results) {
128      deletedFiles += res.getResult(cleanerThreadCheckIntervalMsec) ? 1 : 0;
129    }
130    return deletedFiles;
131  }
132
133  @Override
134  public synchronized void cleanup() {
135    super.cleanup();
136    interruptOldWALsCleaner();
137  }
138
139  @VisibleForTesting
140  int getSizeOfCleaners() {
141    return oldWALsCleaner.size();
142  }
143
144  @VisibleForTesting
145  long getCleanerThreadTimeoutMsec() {
146    return cleanerThreadTimeoutMsec;
147  }
148
149  @VisibleForTesting
150  long getCleanerThreadCheckIntervalMsec() {
151    return cleanerThreadCheckIntervalMsec;
152  }
153
154  private List<Thread> createOldWalsCleaner(int size) {
155    LOG.info("Creating OldWALs cleaners with size=" + size);
156
157    List<Thread> oldWALsCleaner = new ArrayList<>(size);
158    for (int i = 0; i < size; i++) {
159      Thread cleaner = new Thread(() -> deleteFile());
160      cleaner.setName("OldWALsCleaner-" + i);
161      cleaner.setDaemon(true);
162      cleaner.start();
163      oldWALsCleaner.add(cleaner);
164    }
165    return oldWALsCleaner;
166  }
167
168  private void interruptOldWALsCleaner() {
169    for (Thread cleaner : oldWALsCleaner) {
170      cleaner.interrupt();
171    }
172    oldWALsCleaner.clear();
173  }
174
175  private void deleteFile() {
176    while (true) {
177      CleanerContext context = null;
178      boolean succeed = false;
179      boolean interrupted = false;
180      try {
181        context = pendingDelete.take();
182        if (context != null) {
183          FileStatus toClean = context.getTargetToClean();
184          succeed = this.fs.delete(toClean.getPath(), false);
185        }
186      } catch (InterruptedException ite) {
187        // It's most likely from configuration changing request
188        if (context != null) {
189          LOG.warn("Interrupted while cleaning oldWALs " +
190              context.getTargetToClean() + ", try to clean it next round.");
191        }
192        interrupted = true;
193      } catch (IOException e) {
194        // fs.delete() fails.
195        LOG.warn("Failed to clean oldwals with exception: " + e);
196        succeed = false;
197      } finally {
198        if (context != null) {
199          context.setResult(succeed);
200        }
201        if (interrupted) {
202          // Restore interrupt status
203          Thread.currentThread().interrupt();
204          break;
205        }
206      }
207    }
208    if (LOG.isDebugEnabled()) {
209      LOG.debug("Exiting cleaner.");
210    }
211  }
212
213  @Override
214  public synchronized void cancel(boolean mayInterruptIfRunning) {
215    super.cancel(mayInterruptIfRunning);
216    for (Thread t : oldWALsCleaner) {
217      t.interrupt();
218    }
219  }
220
221  private static final class CleanerContext {
222
223    final FileStatus target;
224    volatile boolean result;
225    volatile boolean setFromCleaner = false;
226    long timeoutMsec;
227
228    static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) {
229      return status != null ? new CleanerContext(status, timeoutMsec) : null;
230    }
231
232    private CleanerContext(FileStatus status, long timeoutMsec) {
233      this.target = status;
234      this.result = false;
235      this.timeoutMsec = timeoutMsec;
236    }
237
238    synchronized void setResult(boolean res) {
239      this.result = res;
240      this.setFromCleaner = true;
241      notify();
242    }
243
244    synchronized boolean getResult(long waitIfNotFinished) {
245      long totalTimeMsec = 0;
246      try {
247        while (!setFromCleaner) {
248          long startTimeNanos = System.nanoTime();
249          wait(waitIfNotFinished);
250          totalTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos,
251              TimeUnit.NANOSECONDS);
252          if (totalTimeMsec >= timeoutMsec) {
253            LOG.warn("Spend too much time " + totalTimeMsec + " ms to delete oldwals " + target);
254            return result;
255          }
256        }
257      } catch (InterruptedException e) {
258        LOG.warn("Interrupted while waiting deletion of " + target);
259        return result;
260      }
261      return result;
262    }
263
264    FileStatus getTargetToClean() {
265      return target;
266    }
267  }
268}