001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.cleaner;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Stoppable;
036import org.apache.hadoop.hbase.conf.ConfigurationObserver;
037import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
038import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
039import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
045
046/**
047 * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old
048 * logs folder. The WAL is only deleted if none of the cleaner delegates says otherwise.
049 * @see BaseLogCleanerDelegate
050 */
051@InterfaceAudience.Private
052public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
053  implements ConfigurationObserver {
054  private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class);
055
056  public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size";
057  public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2;
058
059  public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC =
060    "hbase.oldwals.cleaner.thread.timeout.msec";
061  static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L;
062
063  private final LinkedBlockingQueue<CleanerContext> pendingDelete;
064  private List<Thread> oldWALsCleaner;
065  private long cleanerThreadTimeoutMsec;
066
067  /**
068   * @param period    the period of time to sleep between each run
069   * @param stopper   the stopper
070   * @param conf      configuration to use
071   * @param fs        handle to the FS
072   * @param oldLogDir the path to the archived logs
073   * @param pool      the thread pool used to scan directories
074   */
075  public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
076    Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
077    super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
078      pool, params, null);
079    this.pendingDelete = new LinkedBlockingQueue<>();
080    int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
081    if (size <= 0) {
082      size = DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE;
083      LOG.warn(
084        "The configuration {} has been set to an invalid value {}, "
085          + "the default value {} will be used.",
086        OLD_WALS_CLEANER_THREAD_SIZE, size, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
087    }
088    this.oldWALsCleaner = createOldWalsCleaner(size);
089    this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
090      DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
091  }
092
093  @Override
094  protected boolean validate(Path file) {
095    return AbstractFSWALProvider.validateWALFilename(file.getName())
096      || MasterProcedureUtil.validateProcedureWALFilename(file.getName())
097      || file.getName().endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX);
098  }
099
100  @Override
101  public void onConfigurationChange(Configuration conf) {
102    int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
103    if (newSize <= 0) {
104      LOG.debug(
105        "The configuration {} has been set to an invalid value {}, "
106          + "the previous value {} will be used, no need to update.",
107        OLD_WALS_CLEANER_THREAD_SIZE, newSize, oldWALsCleaner.size());
108      return;
109    }
110    if (newSize == oldWALsCleaner.size()) {
111      LOG.debug(
112        "Size from configuration is the same as previous which " + "is {}, no need to update.",
113        newSize);
114      return;
115    }
116    interruptOldWALsCleaner();
117    oldWALsCleaner = createOldWalsCleaner(newSize);
118    cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
119      DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
120  }
121
122  @Override
123  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
124    List<CleanerContext> results = new ArrayList<>();
125    for (FileStatus file : filesToDelete) {
126      LOG.trace("Scheduling file {} for deletion", file);
127      if (file != null) {
128        results.add(new CleanerContext(file));
129      }
130    }
131    if (results.isEmpty()) {
132      return 0;
133    }
134
135    LOG.debug("Old WALs for delete: {}",
136      results.stream().map(cc -> cc.target.getPath().getName()).collect(Collectors.joining(", ")));
137    pendingDelete.addAll(results);
138
139    int deletedFiles = 0;
140    for (CleanerContext res : results) {
141      LOG.trace("Awaiting the results for deletion of old WAL file: {}", res);
142      deletedFiles += res.getResult(this.cleanerThreadTimeoutMsec) ? 1 : 0;
143    }
144    return deletedFiles;
145  }
146
147  @Override
148  public synchronized void cleanup() {
149    super.cleanup();
150    interruptOldWALsCleaner();
151  }
152
153  int getSizeOfCleaners() {
154    return oldWALsCleaner.size();
155  }
156
157  long getCleanerThreadTimeoutMsec() {
158    return cleanerThreadTimeoutMsec;
159  }
160
161  private List<Thread> createOldWalsCleaner(int size) {
162    LOG.info("Creating {} old WALs cleaner threads", size);
163
164    List<Thread> oldWALsCleaner = new ArrayList<>(size);
165    for (int i = 0; i < size; i++) {
166      Thread cleaner = new Thread(() -> deleteFile());
167      cleaner.setName("OldWALsCleaner-" + i);
168      cleaner.setDaemon(true);
169      cleaner.start();
170      oldWALsCleaner.add(cleaner);
171    }
172    return oldWALsCleaner;
173  }
174
175  private void interruptOldWALsCleaner() {
176    for (Thread cleaner : oldWALsCleaner) {
177      LOG.trace("Interrupting thread: {}", cleaner);
178      cleaner.interrupt();
179    }
180    oldWALsCleaner.clear();
181  }
182
183  private void deleteFile() {
184    while (true) {
185      try {
186        final CleanerContext context = pendingDelete.take();
187        Preconditions.checkNotNull(context);
188        FileStatus oldWalFile = context.getTargetToClean();
189        try {
190          LOG.debug("Deleting {}", oldWalFile);
191          boolean succeed = this.fs.delete(oldWalFile.getPath(), false);
192          context.setResult(succeed);
193        } catch (IOException e) {
194          // fs.delete() fails.
195          LOG.warn("Failed to delete old WAL file", e);
196          context.setResult(false);
197        }
198      } catch (InterruptedException ite) {
199        // It is most likely from configuration changing request
200        LOG.warn(
201          "Interrupted while cleaning old WALs, will " + "try to clean it next round. Exiting.");
202        // Restore interrupt status
203        Thread.currentThread().interrupt();
204        return;
205      }
206      LOG.trace("Exiting");
207    }
208  }
209
210  @Override
211  public synchronized void cancel(boolean mayInterruptIfRunning) {
212    LOG.debug("Cancelling LogCleaner");
213    super.cancel(mayInterruptIfRunning);
214    interruptOldWALsCleaner();
215  }
216
217  private static final class CleanerContext {
218
219    final FileStatus target;
220    final AtomicBoolean result;
221    final CountDownLatch remainingResults;
222
223    private CleanerContext(FileStatus status) {
224      this.target = status;
225      this.result = new AtomicBoolean(false);
226      this.remainingResults = new CountDownLatch(1);
227    }
228
229    void setResult(boolean res) {
230      this.result.set(res);
231      this.remainingResults.countDown();
232    }
233
234    boolean getResult(long waitIfNotFinished) {
235      try {
236        boolean completed = this.remainingResults.await(waitIfNotFinished, TimeUnit.MILLISECONDS);
237        if (!completed) {
238          LOG.warn("Spent too much time [{}ms] deleting old WAL file: {}", waitIfNotFinished,
239            target);
240          return false;
241        }
242      } catch (InterruptedException e) {
243        LOG.warn("Interrupted while awaiting deletion of WAL file: {}", target);
244        return false;
245      }
246      return result.get();
247    }
248
249    FileStatus getTargetToClean() {
250      return target;
251    }
252
253    @Override
254    public String toString() {
255      return "CleanerContext [target=" + target + ", result=" + result + "]";
256    }
257  }
258}