001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicBoolean;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Stoppable;
034import org.apache.hadoop.hbase.conf.ConfigurationObserver;
035import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
036import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
043
044/**
045 * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old
046 * logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise.
047 * @see BaseLogCleanerDelegate
048 */
049@InterfaceAudience.Private
050public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
051  implements ConfigurationObserver {
052  private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class);
053
054  public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size";
055  public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2;
056
057  public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC =
058      "hbase.oldwals.cleaner.thread.timeout.msec";
059  @VisibleForTesting
060  static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L;
061
062  private final LinkedBlockingQueue<CleanerContext> pendingDelete;
063  private List<Thread> oldWALsCleaner;
064  private long cleanerThreadTimeoutMsec;
065
066  /**
067   * @param period the period of time to sleep between each run
068   * @param stopper the stopper
069   * @param conf configuration to use
070   * @param fs handle to the FS
071   * @param oldLogDir the path to the archived logs
072   * @param pool the thread pool used to scan directories
073   */
074  public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
075    Path oldLogDir, DirScanPool pool) {
076    super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
077      pool);
078    this.pendingDelete = new LinkedBlockingQueue<>();
079    int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
080    this.oldWALsCleaner = createOldWalsCleaner(size);
081    this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
082      DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
083  }
084
085  @Override
086  protected boolean validate(Path file) {
087    return AbstractFSWALProvider.validateWALFilename(file.getName())
088        || MasterProcedureUtil.validateProcedureWALFilename(file.getName());
089  }
090
091  @Override
092  public void onConfigurationChange(Configuration conf) {
093    int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
094    if (newSize == oldWALsCleaner.size()) {
095      LOG.debug("Size from configuration is the same as previous which "
096          + "is {}, no need to update.", newSize);
097      return;
098    }
099    interruptOldWALsCleaner();
100    oldWALsCleaner = createOldWalsCleaner(newSize);
101    cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
102        DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
103  }
104
105  @Override
106  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
107    List<CleanerContext> results = new ArrayList<>();
108    for (FileStatus file : filesToDelete) {
109      LOG.trace("Scheduling file {} for deletion", file);
110      if (file != null) {
111        results.add(new CleanerContext(file));
112      }
113    }
114
115    LOG.debug("Old WAL files pending deletion: {}", results);
116    pendingDelete.addAll(results);
117
118    int deletedFiles = 0;
119    for (CleanerContext res : results) {
120      LOG.trace("Awaiting the results for deletion of old WAL file: {}", res);
121      deletedFiles += res.getResult(this.cleanerThreadTimeoutMsec) ? 1 : 0;
122    }
123    return deletedFiles;
124  }
125
126  @Override
127  public synchronized void cleanup() {
128    super.cleanup();
129    interruptOldWALsCleaner();
130  }
131
132  @VisibleForTesting
133  int getSizeOfCleaners() {
134    return oldWALsCleaner.size();
135  }
136
137  @VisibleForTesting
138  long getCleanerThreadTimeoutMsec() {
139    return cleanerThreadTimeoutMsec;
140  }
141
142  private List<Thread> createOldWalsCleaner(int size) {
143    LOG.info("Creating {} OldWALs cleaner threads", size);
144
145    List<Thread> oldWALsCleaner = new ArrayList<>(size);
146    for (int i = 0; i < size; i++) {
147      Thread cleaner = new Thread(() -> deleteFile());
148      cleaner.setName("OldWALsCleaner-" + i);
149      cleaner.setDaemon(true);
150      cleaner.start();
151      oldWALsCleaner.add(cleaner);
152    }
153    return oldWALsCleaner;
154  }
155
156  private void interruptOldWALsCleaner() {
157    for (Thread cleaner : oldWALsCleaner) {
158      LOG.trace("Interrupting thread: {}", cleaner);
159      cleaner.interrupt();
160    }
161    oldWALsCleaner.clear();
162  }
163
164  private void deleteFile() {
165    while (true) {
166      try {
167        final CleanerContext context = pendingDelete.take();
168        Preconditions.checkNotNull(context);
169        FileStatus oldWalFile = context.getTargetToClean();
170        try {
171          LOG.debug("Attempting to delete old WAL file: {}", oldWalFile);
172          boolean succeed = this.fs.delete(oldWalFile.getPath(), false);
173          context.setResult(succeed);
174        } catch (IOException e) {
175          // fs.delete() fails.
176          LOG.warn("Failed to clean old WAL file", e);
177          context.setResult(false);
178        }
179      } catch (InterruptedException ite) {
180        // It is most likely from configuration changing request
181        LOG.warn("Interrupted while cleaning old WALs, will "
182            + "try to clean it next round. Exiting.");
183        // Restore interrupt status
184        Thread.currentThread().interrupt();
185        return;
186      }
187      LOG.debug("Exiting");
188    }
189  }
190
191  @Override
192  public synchronized void cancel(boolean mayInterruptIfRunning) {
193    LOG.debug("Cancelling LogCleaner");
194    super.cancel(mayInterruptIfRunning);
195    interruptOldWALsCleaner();
196  }
197
198  private static final class CleanerContext {
199
200    final FileStatus target;
201    final AtomicBoolean result;
202    final CountDownLatch remainingResults;
203
204    private CleanerContext(FileStatus status) {
205      this.target = status;
206      this.result = new AtomicBoolean(false);
207      this.remainingResults = new CountDownLatch(1);
208    }
209
210    void setResult(boolean res) {
211      this.result.set(res);
212      this.remainingResults.countDown();
213    }
214
215    boolean getResult(long waitIfNotFinished) {
216      try {
217        boolean completed = this.remainingResults.await(waitIfNotFinished,
218            TimeUnit.MILLISECONDS);
219        if (!completed) {
220          LOG.warn("Spend too much time [{}ms] to delete old WAL file: {}",
221              waitIfNotFinished, target);
222          return false;
223        }
224      } catch (InterruptedException e) {
225        LOG.warn("Interrupted while awaiting deletion of WAL file: {}", target);
226        return false;
227      }
228      return result.get();
229    }
230
231    FileStatus getTargetToClean() {
232      return target;
233    }
234
235    @Override
236    public String toString() {
237      return "CleanerContext [target=" + target + ", result=" + result + "]";
238    }
239  }
240}