001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.TimeUnit;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.LocatedFileStatus;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.RemoteIterator;
034import org.apache.hadoop.hbase.ScheduledChore;
035import org.apache.hadoop.hbase.TableDescriptors;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.backup.HFileArchiver;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.io.hfile.CacheConfig;
044import org.apache.hadoop.hbase.master.HMaster;
045import org.apache.hadoop.hbase.regionserver.BloomType;
046import org.apache.hadoop.hbase.regionserver.HStoreFile;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.CommonFSUtils;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.util.FSUtils;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
056
057/**
058 * The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete
059 * (files which have no active references to) mob files.
060 */
061@InterfaceAudience.Private
062public class MobFileCleanerChore extends ScheduledChore {
063
064  private static final Logger LOG = LoggerFactory.getLogger(MobFileCleanerChore.class);
065  private final HMaster master;
066  private ExpiredMobFileCleaner cleaner;
067
068  static {
069    Configuration.addDeprecation(MobConstants.DEPRECATED_MOB_CLEANER_PERIOD,
070      MobConstants.MOB_CLEANER_PERIOD);
071  }
072
073  public MobFileCleanerChore(HMaster master) {
074    super(master.getServerName() + "-MobFileCleanerChore", master,
075      master.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
076        MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
077      master.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
078        MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
079      TimeUnit.SECONDS);
080    this.master = master;
081    cleaner = new ExpiredMobFileCleaner();
082    cleaner.setConf(master.getConfiguration());
083    checkObsoleteConfigurations();
084  }
085
086  private void checkObsoleteConfigurations() {
087    Configuration conf = master.getConfiguration();
088
089    if (conf.get("hbase.mob.compaction.mergeable.threshold") != null) {
090      LOG.warn("'hbase.mob.compaction.mergeable.threshold' is obsolete and not used anymore.");
091    }
092    if (conf.get("hbase.mob.delfile.max.count") != null) {
093      LOG.warn("'hbase.mob.delfile.max.count' is obsolete and not used anymore.");
094    }
095    if (conf.get("hbase.mob.compaction.threads.max") != null) {
096      LOG.warn("'hbase.mob.compaction.threads.max' is obsolete and not used anymore.");
097    }
098    if (conf.get("hbase.mob.compaction.batch.size") != null) {
099      LOG.warn("'hbase.mob.compaction.batch.size' is obsolete and not used anymore.");
100    }
101  }
102
103  @RestrictedApi(explanation = "Should only be called in tests", link = "",
104      allowedOnPath = ".*/src/test/.*")
105  public MobFileCleanerChore() {
106    this.master = null;
107  }
108
109  @Override
110  protected void chore() {
111    TableDescriptors htds = master.getTableDescriptors();
112
113    Map<String, TableDescriptor> map = null;
114    try {
115      map = htds.getAll();
116    } catch (IOException e) {
117      LOG.error("MobFileCleanerChore failed", e);
118      return;
119    }
120    for (TableDescriptor htd : map.values()) {
121      for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
122        if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
123          try {
124            cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
125          } catch (IOException e) {
126            LOG.error("Failed to clean the expired mob files table={} family={}",
127              htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
128          }
129        }
130      }
131      try {
132        // Now clean obsolete files for a table
133        LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
134        cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName());
135        LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
136      } catch (IOException e) {
137        LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e);
138      }
139    }
140  }
141
142  /**
143   * Performs housekeeping file cleaning (called by MOB Cleaner chore)
144   * @param conf  configuration
145   * @param table table name
146   * @throws IOException exception
147   */
148  public void cleanupObsoleteMobFiles(Configuration conf, TableName table) throws IOException {
149
150    long minAgeToArchive =
151      conf.getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
152    // We check only those MOB files, which creation time is less
153    // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
154    // gives us full confidence that all corresponding store files will
155    // exist at the time cleaning procedure begins and will be examined.
156    // So, if MOB file creation time is greater than this maxTimeToArchive,
157    // this will be skipped and won't be archived.
158    long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive;
159    try (final Connection conn = ConnectionFactory.createConnection(conf);
160      final Admin admin = conn.getAdmin();) {
161      TableDescriptor htd = admin.getDescriptor(table);
162      List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
163      if (list.size() == 0) {
164        LOG.info("Skipping non-MOB table [{}]", table);
165        return;
166      } else {
167        LOG.info("Only MOB files whose creation time older than {} will be archived, table={}",
168          maxCreationTimeToArchive, table);
169      }
170
171      FileSystem fs = FileSystem.get(conf);
172      Set<String> regionNames = new HashSet<>();
173      Path rootDir = CommonFSUtils.getRootDir(conf);
174      Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
175      List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
176
177      Set<String> allActiveMobFileName = new HashSet<String>();
178      for (Path regionPath : regionDirs) {
179        regionNames.add(regionPath.getName());
180        for (ColumnFamilyDescriptor hcd : list) {
181          String family = hcd.getNameAsString();
182          Path storePath = new Path(regionPath, family);
183          boolean succeed = false;
184          Set<String> regionMobs = new HashSet<String>();
185
186          while (!succeed) {
187            if (!fs.exists(storePath)) {
188              String errMsg = String.format("Directory %s was deleted during MOB file cleaner chore"
189                + " execution, aborting MOB file cleaner chore.", storePath);
190              throw new IOException(errMsg);
191            }
192            RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(storePath);
193            List<Path> storeFiles = new ArrayList<Path>();
194            // Load list of store files first
195            while (rit.hasNext()) {
196              Path p = rit.next().getPath();
197              if (fs.isFile(p)) {
198                storeFiles.add(p);
199              }
200            }
201            LOG.info("Found {} store files in: {}", storeFiles.size(), storePath);
202            Path currentPath = null;
203            try {
204              for (Path pp : storeFiles) {
205                currentPath = pp;
206                LOG.trace("Store file: {}", pp);
207                HStoreFile sf = null;
208                byte[] mobRefData = null;
209                byte[] bulkloadMarkerData = null;
210                try {
211                  sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true);
212                  sf.initReader();
213                  mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
214                  bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
215                  // close store file to avoid memory leaks
216                  sf.closeStoreFile(true);
217                } catch (IOException ex) {
218                  // When FileBased SFT is active the store dir can contain corrupted or incomplete
219                  // files. So read errors are expected. We just skip these files.
220                  if (ex instanceof FileNotFoundException) {
221                    throw ex;
222                  }
223                  LOG.debug("Failed to get mob data from file: {} due to error.", pp.toString(),
224                    ex);
225                  continue;
226                }
227                if (mobRefData == null) {
228                  if (bulkloadMarkerData == null) {
229                    LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
230                      + "can not proceed until all old files will be MOB-compacted.", pp);
231                    return;
232                  } else {
233                    LOG.debug("Skipping file without MOB references (bulkloaded file):{}", pp);
234                    continue;
235                  }
236                }
237                // file may or may not have MOB references, but was created by the distributed
238                // mob compaction code.
239                try {
240                  SetMultimap<TableName, String> mobs =
241                    MobUtils.deserializeMobFileRefs(mobRefData).build();
242                  LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
243                  LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
244                  regionMobs.addAll(mobs.values());
245                } catch (RuntimeException exception) {
246                  throw new IOException("failure getting mob references for hfile " + sf,
247                    exception);
248                }
249              }
250            } catch (FileNotFoundException e) {
251              LOG.warn(
252                "Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error",
253                currentPath, e);
254              regionMobs.clear();
255              continue;
256            }
257            succeed = true;
258          }
259
260          // Add MOB references for current region/family
261          allActiveMobFileName.addAll(regionMobs);
262        } // END column families
263      } // END regions
264      // Check if number of MOB files too big (over 1M)
265      if (allActiveMobFileName.size() > 1000000) {
266        LOG.warn("Found too many active MOB files: {}, table={}, "
267          + "this may result in high memory pressure.", allActiveMobFileName.size(), table);
268      }
269      LOG.debug("Found: {} active mob refs for table={}", allActiveMobFileName.size(), table);
270      allActiveMobFileName.stream().forEach(LOG::trace);
271
272      // Now scan MOB directories and find MOB files with no references to them
273      for (ColumnFamilyDescriptor hcd : list) {
274        List<Path> toArchive = new ArrayList<Path>();
275        String family = hcd.getNameAsString();
276        Path dir = MobUtils.getMobFamilyPath(conf, table, family);
277        RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
278        while (rit.hasNext()) {
279          LocatedFileStatus lfs = rit.next();
280          Path p = lfs.getPath();
281          String[] mobParts = p.getName().split("_");
282          String regionName = mobParts[mobParts.length - 1];
283
284          if (!regionNames.contains(regionName)) {
285            // MOB belonged to a region no longer hosted
286            long creationTime = fs.getFileStatus(p).getModificationTime();
287            if (creationTime < maxCreationTimeToArchive) {
288              LOG.trace("Archiving MOB file {} creation time={}", p,
289                (fs.getFileStatus(p).getModificationTime()));
290              toArchive.add(p);
291            } else {
292              LOG.trace("Skipping fresh file: {}. Creation time={}", p,
293                fs.getFileStatus(p).getModificationTime());
294            }
295          } else {
296            LOG.trace("Keeping MOB file with existing region: {}", p);
297          }
298        }
299        LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", toArchive.size(),
300          table, family);
301        archiveMobFiles(conf, table, family.getBytes(), toArchive);
302        LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(), table,
303          family);
304      }
305    }
306  }
307
308  /**
309   * Archives the mob files.
310   * @param conf       The current configuration.
311   * @param tableName  The table name.
312   * @param family     The name of the column family.
313   * @param storeFiles The files to be archived.
314   * @throws IOException exception
315   */
316  public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family,
317    List<Path> storeFiles) throws IOException {
318
319    if (storeFiles.size() == 0) {
320      // nothing to remove
321      LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName,
322        Bytes.toString(family));
323      return;
324    }
325    Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
326    FileSystem fs = storeFiles.get(0).getFileSystem(conf);
327
328    for (Path p : storeFiles) {
329      LOG.debug("MOB Cleaner is archiving: {}", p);
330      HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir,
331        family, p);
332    }
333  }
334}