001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.atomic.AtomicLong;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.ScheduledChore;
029import org.apache.hadoop.hbase.Stoppable;
030import org.apache.hadoop.hbase.io.HFileLink;
031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
032import org.apache.hadoop.ipc.RemoteException;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * This Chore, every time it runs, will clear the unsused HFiles in the data folder.
039 */
040@InterfaceAudience.Private
041public class BrokenStoreFileCleaner extends ScheduledChore {
042  private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class);
043  public static final String BROKEN_STOREFILE_CLEANER_ENABLED =
044    "hbase.region.broken.storefilecleaner.enabled";
045  public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false;
046  public static final String BROKEN_STOREFILE_CLEANER_TTL =
047    "hbase.region.broken.storefilecleaner.ttl";
048  public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; // 12h
049  public static final String BROKEN_STOREFILE_CLEANER_DELAY =
050    "hbase.region.broken.storefilecleaner.delay";
051  public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; // 2h
052  public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER =
053    "hbase.region.broken.storefilecleaner.delay.jitter";
054  public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D;
055  public static final String BROKEN_STOREFILE_CLEANER_PERIOD =
056    "hbase.region.broken.storefilecleaner.period";
057  public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; // 6h
058
059  private HRegionServer regionServer;
060  private final AtomicBoolean enabled = new AtomicBoolean(true);
061  private long fileTtl;
062
063  public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper,
064    Configuration conf, HRegionServer regionServer) {
065    super("BrokenStoreFileCleaner", stopper, period, delay);
066    this.regionServer = regionServer;
067    setEnabled(
068      conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED));
069    fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL);
070  }
071
072  public boolean setEnabled(final boolean enabled) {
073    return this.enabled.getAndSet(enabled);
074  }
075
076  public boolean getEnabled() {
077    return this.enabled.get();
078  }
079
080  @Override
081  public void chore() {
082    if (getEnabled()) {
083      long start = EnvironmentEdgeManager.currentTime();
084      AtomicLong deletedFiles = new AtomicLong(0);
085      AtomicLong failedDeletes = new AtomicLong(0);
086      for (HRegion region : regionServer.getRegions()) {
087        for (HStore store : region.getStores()) {
088          // only do cleanup in stores not using tmp directories
089          if (store.getStoreEngine().requireWritingToTmpDirFirst()) {
090            continue;
091          }
092          Path storePath =
093            new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName());
094
095          try {
096            List<FileStatus> fsStoreFiles =
097              Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath));
098            fsStoreFiles
099              .forEach(file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes));
100          } catch (IOException e) {
101            LOG.warn("Failed to list files in {}, cleanup is skipped there", storePath);
102            continue;
103          }
104        }
105      }
106      LOG.debug(
107        "BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed "
108          + "to delete {}",
109        regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start,
110        deletedFiles.get(), failedDeletes.get());
111    } else {
112      LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning.");
113    }
114  }
115
116  private void cleanFileIfNeeded(FileStatus file, HStore store, AtomicLong deletedFiles,
117    AtomicLong failedDeletes) {
118    if (file.isDirectory()) {
119      LOG.trace("This is a Directory {}, skip cleanup", file.getPath());
120      return;
121    }
122
123    if (!validate(file.getPath())) {
124      LOG.trace("Invalid file {}, skip cleanup", file.getPath());
125      return;
126    }
127
128    if (!isOldEnough(file)) {
129      LOG.trace("Fresh file {}, skip cleanup", file.getPath());
130      return;
131    }
132
133    if (isActiveStorefile(file, store)) {
134      LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath());
135      return;
136    }
137
138    // Compacted files can still have readers and are cleaned by a separate chore, so they have to
139    // be skipped here
140    if (isCompactedFile(file, store)) {
141      LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath());
142      return;
143    }
144
145    if (isCompactionResultFile(file, store)) {
146      LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath());
147      return;
148    }
149
150    // Adding the check here is enough to prevent accidentally deleting of intact HFiles.
151    // If the region is still available, i.e, not closed or closing, all above checks are valid
152    // so we are safe to delete the 'broken' files. And if the region is closed or closing when we
153    // reach here, since we will always give up deleting, the only possible inconsistency is that
154    // the file is broken since the region is not closing or close when performing above checks
155    // but here we give up deleting it. So in either way, we will not accidentally deleting intact
156    // HFiles, but only give up deleting broken files. But it is OK since we can still
157    // delete them in the next round of cleaner run.
158    if (!store.getHRegion().isAvailable()) {
159      LOG.trace("This store's region {} is no longer open, so it might have moved elsewhere. "
160        + "Skipping cleanup.", store.getRegionInfo().getEncodedName());
161      return;
162    }
163
164    deleteFile(file, store, deletedFiles, failedDeletes);
165  }
166
167  private boolean isCompactionResultFile(FileStatus file, HStore store) {
168    return store.getStoreFilesBeingWritten().contains(file.getPath());
169  }
170
171  // Compacted files can still have readers and are cleaned by a separate chore, so they have to
172  // be skipped here
173  private boolean isCompactedFile(FileStatus file, HStore store) {
174    return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream()
175      .anyMatch(sf -> sf.getPath().equals(file.getPath()));
176  }
177
178  private boolean isActiveStorefile(FileStatus file, HStore store) {
179    return store.getStoreEngine().getStoreFileManager().getStoreFiles().stream()
180      .anyMatch(sf -> sf.getPath().equals(file.getPath()));
181  }
182
183  boolean validate(Path file) {
184    if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
185      return true;
186    }
187    return StoreFileInfo.validateStoreFileName(file.getName());
188  }
189
190  boolean isOldEnough(FileStatus file) {
191    return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime();
192  }
193
194  private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles,
195    AtomicLong failedDeletes) {
196    Path filePath = file.getPath();
197    LOG.debug("Removing {} from store", filePath);
198    try {
199      boolean success = store.getFileSystem().delete(filePath, false);
200      if (!success) {
201        failedDeletes.incrementAndGet();
202        LOG.warn(
203          "Attempted to delete:" + filePath + ", but couldn't. Attempt to delete on next pass.");
204      } else {
205        deletedFiles.incrementAndGet();
206      }
207    } catch (IOException e) {
208      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
209      LOG.warn("Error while deleting: " + filePath, e);
210    }
211  }
212
213}