001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.stream.Collectors;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Stoppable;
036import org.apache.hadoop.hbase.conf.ConfigurationObserver;
037import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
038import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
039import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
046
047/**
048 * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old
049 * logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise.
050 * @see BaseLogCleanerDelegate
051 */
052@InterfaceAudience.Private
053public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
054  implements ConfigurationObserver {
055  private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class);
056
057  public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size";
058  public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2;
059
060  public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC =
061      "hbase.oldwals.cleaner.thread.timeout.msec";
062  @VisibleForTesting
063  static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L;
064
065  private final LinkedBlockingQueue<CleanerContext> pendingDelete;
066  private List<Thread> oldWALsCleaner;
067  private long cleanerThreadTimeoutMsec;
068
069  /**
070   * @param period the period of time to sleep between each run
071   * @param stopper the stopper
072   * @param conf configuration to use
073   * @param fs handle to the FS
074   * @param oldLogDir the path to the archived logs
075   * @param pool the thread pool used to scan directories
076   */
077  public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
078    Path oldLogDir, DirScanPool pool) {
079    super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
080      pool);
081    this.pendingDelete = new LinkedBlockingQueue<>();
082    int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
083    this.oldWALsCleaner = createOldWalsCleaner(size);
084    this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
085      DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
086  }
087
088  @Override
089  protected boolean validate(Path file) {
090    return AbstractFSWALProvider.validateWALFilename(file.getName()) ||
091      MasterProcedureUtil.validateProcedureWALFilename(file.getName()) ||
092      file.getName().endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX);
093  }
094
095  @Override
096  public void onConfigurationChange(Configuration conf) {
097    int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
098    if (newSize == oldWALsCleaner.size()) {
099      LOG.debug("Size from configuration is the same as previous which "
100          + "is {}, no need to update.", newSize);
101      return;
102    }
103    interruptOldWALsCleaner();
104    oldWALsCleaner = createOldWalsCleaner(newSize);
105    cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
106        DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
107  }
108
109  @Override
110  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
111    List<CleanerContext> results = new ArrayList<>();
112    for (FileStatus file : filesToDelete) {
113      LOG.trace("Scheduling file {} for deletion", file);
114      if (file != null) {
115        results.add(new CleanerContext(file));
116      }
117    }
118    if (results.isEmpty()) {
119      return 0;
120    }
121
122    LOG.debug("Old WALs for delete: {}",
123      results.stream().map(cc -> cc.target.getPath().getName()).
124        collect(Collectors.joining(", ")));
125    pendingDelete.addAll(results);
126
127    int deletedFiles = 0;
128    for (CleanerContext res : results) {
129      LOG.trace("Awaiting the results for deletion of old WAL file: {}", res);
130      deletedFiles += res.getResult(this.cleanerThreadTimeoutMsec) ? 1 : 0;
131    }
132    return deletedFiles;
133  }
134
135  @Override
136  public synchronized void cleanup() {
137    super.cleanup();
138    interruptOldWALsCleaner();
139  }
140
141  @VisibleForTesting
142  int getSizeOfCleaners() {
143    return oldWALsCleaner.size();
144  }
145
146  @VisibleForTesting
147  long getCleanerThreadTimeoutMsec() {
148    return cleanerThreadTimeoutMsec;
149  }
150
151  private List<Thread> createOldWalsCleaner(int size) {
152    LOG.info("Creating {} old WALs cleaner threads", size);
153
154    List<Thread> oldWALsCleaner = new ArrayList<>(size);
155    for (int i = 0; i < size; i++) {
156      Thread cleaner = new Thread(() -> deleteFile());
157      cleaner.setName("OldWALsCleaner-" + i);
158      cleaner.setDaemon(true);
159      cleaner.start();
160      oldWALsCleaner.add(cleaner);
161    }
162    return oldWALsCleaner;
163  }
164
165  private void interruptOldWALsCleaner() {
166    for (Thread cleaner : oldWALsCleaner) {
167      LOG.trace("Interrupting thread: {}", cleaner);
168      cleaner.interrupt();
169    }
170    oldWALsCleaner.clear();
171  }
172
173  private void deleteFile() {
174    while (true) {
175      try {
176        final CleanerContext context = pendingDelete.take();
177        Preconditions.checkNotNull(context);
178        FileStatus oldWalFile = context.getTargetToClean();
179        try {
180          LOG.debug("Deleting {}", oldWalFile);
181          boolean succeed = this.fs.delete(oldWalFile.getPath(), false);
182          context.setResult(succeed);
183        } catch (IOException e) {
184          // fs.delete() fails.
185          LOG.warn("Failed to delete old WAL file", e);
186          context.setResult(false);
187        }
188      } catch (InterruptedException ite) {
189        // It is most likely from configuration changing request
190        LOG.warn("Interrupted while cleaning old WALs, will "
191            + "try to clean it next round. Exiting.");
192        // Restore interrupt status
193        Thread.currentThread().interrupt();
194        return;
195      }
196      LOG.trace("Exiting");
197    }
198  }
199
200  @Override
201  public synchronized void cancel(boolean mayInterruptIfRunning) {
202    LOG.debug("Cancelling LogCleaner");
203    super.cancel(mayInterruptIfRunning);
204    interruptOldWALsCleaner();
205  }
206
207  private static final class CleanerContext {
208
209    final FileStatus target;
210    final AtomicBoolean result;
211    final CountDownLatch remainingResults;
212
213    private CleanerContext(FileStatus status) {
214      this.target = status;
215      this.result = new AtomicBoolean(false);
216      this.remainingResults = new CountDownLatch(1);
217    }
218
219    void setResult(boolean res) {
220      this.result.set(res);
221      this.remainingResults.countDown();
222    }
223
224    boolean getResult(long waitIfNotFinished) {
225      try {
226        boolean completed = this.remainingResults.await(waitIfNotFinished,
227            TimeUnit.MILLISECONDS);
228        if (!completed) {
229          LOG.warn("Spent too much time [{}ms] deleting old WAL file: {}",
230              waitIfNotFinished, target);
231          return false;
232        }
233      } catch (InterruptedException e) {
234        LOG.warn("Interrupted while awaiting deletion of WAL file: {}", target);
235        return false;
236      }
237      return result.get();
238    }
239
240    FileStatus getTargetToClean() {
241      return target;
242    }
243
244    @Override
245    public String toString() {
246      return "CleanerContext [target=" + target + ", result=" + result + "]";
247    }
248  }
249}