001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.LocatedFileStatus;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.fs.RemoteIterator;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.backup.HFileArchiver;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
035import org.apache.hadoop.hbase.client.TableDescriptor;
036import org.apache.hadoop.hbase.io.hfile.CacheConfig;
037import org.apache.hadoop.hbase.regionserver.BloomType;
038import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
039import org.apache.hadoop.hbase.regionserver.HStoreFile;
040import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
041import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
042import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.CommonFSUtils;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.hadoop.hbase.util.FSUtils;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
052
053@InterfaceAudience.Private
054public final class MobFileCleanupUtil {
055
056  private static final Logger LOG = LoggerFactory.getLogger(MobFileCleanupUtil.class);
057
058  private MobFileCleanupUtil() {
059  }
060
061  /**
062   * Performs housekeeping file cleaning (called by MOB Cleaner chore)
063   * @param conf  configuration
064   * @param table table name
065   * @throws IOException exception
066   */
067  public static void cleanupObsoleteMobFiles(Configuration conf, TableName table, Admin admin)
068    throws IOException {
069    long minAgeToArchive =
070      conf.getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
071    // We check only those MOB files, which creation time is less
072    // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
073    // gives us full confidence that all corresponding store files will
074    // exist at the time cleaning procedure begins and will be examined.
075    // So, if MOB file creation time is greater than this maxTimeToArchive,
076    // this will be skipped and won't be archived.
077    long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive;
078    TableDescriptor htd = admin.getDescriptor(table);
079    List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
080    if (list.size() == 0) {
081      LOG.info("Skipping non-MOB table [{}]", table);
082      return;
083    } else {
084      LOG.info("Only MOB files whose creation time older than {} will be archived, table={}",
085        maxCreationTimeToArchive, table);
086    }
087
088    FileSystem fs = FileSystem.get(conf);
089    Set<String> regionNames = new HashSet<>();
090    Path rootDir = CommonFSUtils.getRootDir(conf);
091    Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
092    List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
093
094    Set<String> allActiveMobFileName = new HashSet<String>();
095    for (Path regionPath : regionDirs) {
096      regionNames.add(regionPath.getName());
097      HRegionFileSystem regionFS =
098        HRegionFileSystem.create(conf, fs, tableDir, MobUtils.getMobRegionInfo(table));
099      for (ColumnFamilyDescriptor hcd : list) {
100        StoreFileTracker sft = StoreFileTrackerFactory.create(conf, htd, hcd, regionFS, false);
101        String family = hcd.getNameAsString();
102        Path storePath = new Path(regionPath, family);
103        boolean succeed = false;
104        Set<String> regionMobs = new HashSet<String>();
105
106        while (!succeed) {
107          if (!fs.exists(storePath)) {
108            String errMsg = String.format("Directory %s was deleted during MOB file cleaner chore"
109              + " execution, aborting MOB file cleaner chore.", storePath);
110            throw new IOException(errMsg);
111          }
112          List<StoreFileInfo> storeFileInfos = sft.load();
113          LOG.info("Found {} store files in: {}", storeFileInfos.size(), storePath);
114          Path currentPath = null;
115          try {
116            for (StoreFileInfo storeFileInfo : storeFileInfos) {
117              Path pp = storeFileInfo.getPath();
118              currentPath = pp;
119              LOG.trace("Store file: {}", pp);
120              HStoreFile sf = null;
121              byte[] mobRefData = null;
122              byte[] bulkloadMarkerData = null;
123              try {
124                sf = new HStoreFile(storeFileInfo, BloomType.NONE, CacheConfig.DISABLED);
125                sf.initReader();
126                mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
127                bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
128                // close store file to avoid memory leaks
129                sf.closeStoreFile(true);
130              } catch (IOException ex) {
131                // When FileBased SFT is active the store dir can contain corrupted or incomplete
132                // files. So read errors are expected. We just skip these files.
133                if (ex instanceof FileNotFoundException) {
134                  throw ex;
135                }
136                LOG.debug("Failed to get mob data from file: {} due to error.", pp.toString(), ex);
137                continue;
138              }
139              if (mobRefData == null) {
140                if (bulkloadMarkerData == null) {
141                  LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
142                    + "can not proceed until all old files will be MOB-compacted.", pp);
143                  return;
144                } else {
145                  LOG.debug("Skipping file without MOB references (bulkloaded file):{}", pp);
146                  continue;
147                }
148              }
149              // file may or may not have MOB references, but was created by the distributed
150              // mob compaction code.
151              try {
152                SetMultimap<TableName, String> mobs =
153                  MobUtils.deserializeMobFileRefs(mobRefData).build();
154                LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
155                LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
156                regionMobs.addAll(mobs.values());
157              } catch (RuntimeException exception) {
158                throw new IOException("failure getting mob references for hfile " + sf, exception);
159              }
160            }
161          } catch (FileNotFoundException e) {
162            LOG.warn(
163              "Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error",
164              currentPath, e);
165            regionMobs.clear();
166            continue;
167          }
168          succeed = true;
169        }
170
171        // Add MOB references for current region/family
172        allActiveMobFileName.addAll(regionMobs);
173      } // END column families
174    } // END regions
175    // Check if number of MOB files too big (over 1M)
176    if (allActiveMobFileName.size() > 1000000) {
177      LOG.warn("Found too many active MOB files: {}, table={}, "
178        + "this may result in high memory pressure.", allActiveMobFileName.size(), table);
179    }
180    LOG.debug("Found: {} active mob refs for table={}", allActiveMobFileName.size(), table);
181    allActiveMobFileName.stream().forEach(LOG::trace);
182
183    // Now scan MOB directories and find MOB files with no references to them
184    for (ColumnFamilyDescriptor hcd : list) {
185      checkColumnFamilyDescriptor(conf, table, fs, admin, hcd, regionNames,
186        maxCreationTimeToArchive);
187    }
188  }
189
190  private static void checkColumnFamilyDescriptor(Configuration conf, TableName table,
191    FileSystem fs, Admin admin, ColumnFamilyDescriptor hcd, Set<String> regionNames,
192    long maxCreationTimeToArchive) throws IOException {
193    List<Path> toArchive = new ArrayList<Path>();
194    String family = hcd.getNameAsString();
195    Path dir = MobUtils.getMobFamilyPath(conf, table, family);
196    RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
197    while (rit.hasNext()) {
198      LocatedFileStatus lfs = rit.next();
199      Path p = lfs.getPath();
200      String[] mobParts = p.getName().split("_");
201      String regionName = mobParts[mobParts.length - 1];
202
203      if (!regionNames.contains(regionName)) {
204        // MOB belonged to a region no longer hosted
205        long creationTime = fs.getFileStatus(p).getModificationTime();
206        if (creationTime < maxCreationTimeToArchive) {
207          LOG.trace("Archiving MOB file {} creation time={}", p,
208            (fs.getFileStatus(p).getModificationTime()));
209          toArchive.add(p);
210        } else {
211          LOG.trace("Skipping fresh file: {}. Creation time={}", p,
212            fs.getFileStatus(p).getModificationTime());
213        }
214      } else {
215        LOG.trace("Keeping MOB file with existing region: {}", p);
216      }
217    }
218    LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", toArchive.size(),
219      table, family);
220    archiveMobFiles(conf, table, admin, family.getBytes(), toArchive);
221    LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(), table, family);
222  }
223
224  /**
225   * Archives the mob files.
226   * @param conf       The current configuration.
227   * @param tableName  The table name.
228   * @param family     The name of the column family.
229   * @param storeFiles The files to be archived.
230   * @throws IOException exception
231   */
232  private static void archiveMobFiles(Configuration conf, TableName tableName, Admin admin,
233    byte[] family, List<Path> storeFiles) throws IOException {
234
235    if (storeFiles.size() == 0) {
236      // nothing to remove
237      LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName,
238        Bytes.toString(family));
239      return;
240    }
241    Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
242    FileSystem fs = storeFiles.get(0).getFileSystem(conf);
243
244    for (Path p : storeFiles) {
245      LOG.debug("MOB Cleaner is archiving: {}", p);
246      HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir,
247        family, p);
248    }
249  }
250}