001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.ThreadLocalRandom;
030import java.util.concurrent.TimeUnit;
031import java.util.stream.Collectors;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.LocatedFileStatus;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.fs.RemoteIterator;
037import org.apache.hadoop.hbase.ScheduledChore;
038import org.apache.hadoop.hbase.TableDescriptors;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.backup.HFileArchiver;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.regionserver.HStore;
046import org.apache.hadoop.hbase.regionserver.HStoreFile;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.CommonFSUtils;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
055
056/**
057 * The class RSMobFileCleanerChore for running cleaner regularly to remove the obsolete (files which
058 * have no active references to) mob files that were referenced from the current RS.
059 */
060@InterfaceAudience.Private
061public class RSMobFileCleanerChore extends ScheduledChore {
062
063  private static final Logger LOG = LoggerFactory.getLogger(RSMobFileCleanerChore.class);
064  private final HRegionServer rs;
065
066  public RSMobFileCleanerChore(HRegionServer rs) {
067    super(rs.getServerName() + "-MobFileCleanerChore", rs,
068      rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
069        MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
070      Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
071        MobConstants.DEFAULT_MOB_CLEANER_PERIOD)
072        * ((ThreadLocalRandom.current().nextDouble() + 0.5D))),
073      TimeUnit.SECONDS);
074    // to prevent a load spike on the fs the initial delay is modified by +/- 50%
075    this.rs = rs;
076  }
077
078  public RSMobFileCleanerChore() {
079    this.rs = null;
080  }
081
082  @Override
083  protected void chore() {
084
085    long minAgeToArchive = rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
086      MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
087    // We check only those MOB files, which creation time is less
088    // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
089    // gives us full confidence that all corresponding store files will
090    // exist at the time cleaning procedure begins and will be examined.
091    // So, if MOB file creation time is greater than this maxTimeToArchive,
092    // this will be skipped and won't be archived.
093    long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive;
094
095    TableDescriptors htds = rs.getTableDescriptors();
096    try {
097      FileSystem fs = FileSystem.get(rs.getConfiguration());
098
099      Map<String, TableDescriptor> map = null;
100      try {
101        map = htds.getAll();
102      } catch (IOException e) {
103        LOG.error("MobFileCleanerChore failed", e);
104        return;
105      }
106      Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>();
107      for (TableDescriptor htd : map.values()) {
108        // Now clean obsolete files for a table
109        LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
110        List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
111        List<HRegion> regions = rs.getRegions(htd.getTableName());
112        for (HRegion region : regions) {
113          for (ColumnFamilyDescriptor hcd : list) {
114            HStore store = region.getStore(hcd.getName());
115            Collection<HStoreFile> sfs = store.getStorefiles();
116            Set<String> regionMobs = new HashSet<String>();
117            Path currentPath = null;
118            try {
119              // collectinng referenced MOBs
120              for (HStoreFile sf : sfs) {
121                currentPath = sf.getPath();
122                sf.initReader();
123                byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
124                byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
125                // close store file to avoid memory leaks
126                sf.closeStoreFile(true);
127                if (mobRefData == null) {
128                  if (bulkloadMarkerData == null) {
129                    LOG.warn(
130                      "Found old store file with no MOB_FILE_REFS: {} - "
131                        + "can not proceed until all old files will be MOB-compacted.",
132                      currentPath);
133                    return;
134                  } else {
135                    LOG.debug("Skipping file without MOB references (bulkloaded file):{}",
136                      currentPath);
137                    continue;
138                  }
139                }
140                // file may or may not have MOB references, but was created by the distributed
141                // mob compaction code.
142                try {
143                  SetMultimap<TableName, String> mobs =
144                    MobUtils.deserializeMobFileRefs(mobRefData).build();
145                  LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
146                  LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
147                  regionMobs.addAll(mobs.values());
148                } catch (RuntimeException exception) {
149                  throw new IOException("failure getting mob references for hfile " + sf,
150                    exception);
151                }
152              }
153              // collecting files, MOB included currently being written
154              regionMobs.addAll(store.getStoreFilesBeingWritten().stream()
155                .map(path -> path.getName()).collect(Collectors.toList()));
156
157              referencedMOBs
158                .computeIfAbsent(hcd.getNameAsString(), cf -> new HashMap<String, List<String>>())
159                .computeIfAbsent(region.getRegionInfo().getEncodedName(), name -> new ArrayList<>())
160                .addAll(regionMobs);
161
162            } catch (FileNotFoundException e) {
163              LOG.warn(
164                "Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error",
165                currentPath, e);
166              regionMobs.clear();
167              continue;
168            } catch (IOException e) {
169              LOG.error("Failed to clean the obsolete mob files for table={}",
170                htd.getTableName().getNameAsString(), e);
171            }
172          }
173        }
174
175        if (LOG.isDebugEnabled()) {
176          LOG.debug("Found: {} active mob refs for table={}",
177            referencedMOBs.values().stream().map(inner -> inner.values())
178              .flatMap(lists -> lists.stream()).mapToInt(lists -> lists.size()).sum(),
179            htd.getTableName().getNameAsString());
180        }
181        if (LOG.isTraceEnabled()) {
182          referencedMOBs.values().stream().forEach(innerMap -> innerMap.values().stream()
183            .forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace)));
184        }
185
186        // collect regions referencing MOB files belonging to the current rs
187        Set<String> regionsCovered = new HashSet<>();
188        referencedMOBs.values().stream()
189          .forEach(regionMap -> regionsCovered.addAll(regionMap.keySet()));
190
191        for (ColumnFamilyDescriptor hcd : list) {
192          List<Path> toArchive = new ArrayList<Path>();
193          String family = hcd.getNameAsString();
194          Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(), htd.getTableName(), family);
195          RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
196          while (rit.hasNext()) {
197            LocatedFileStatus lfs = rit.next();
198            Path p = lfs.getPath();
199            String[] mobParts = p.getName().split("_");
200            String regionName = mobParts[mobParts.length - 1];
201
202            // skip MOB files not belonging to a region assigned to the current rs
203            if (!regionsCovered.contains(regionName)) {
204              LOG.trace("MOB file does not belong to current rs: {}", p);
205              continue;
206            }
207
208            // check active or actively written mob files
209            Map<String, List<String>> cfMobs = referencedMOBs.get(hcd.getNameAsString());
210            if (
211              cfMobs != null && cfMobs.get(regionName) != null
212                && cfMobs.get(regionName).contains(p.getName())
213            ) {
214              LOG.trace("Keeping active MOB file: {}", p);
215              continue;
216            }
217
218            // MOB is not in a list of active references, but it can be too
219            // fresh, skip it in this case
220            long creationTime = fs.getFileStatus(p).getModificationTime();
221            if (creationTime < maxCreationTimeToArchive) {
222              LOG.trace("Archiving MOB file {} creation time={}", p,
223                (fs.getFileStatus(p).getModificationTime()));
224              toArchive.add(p);
225            } else {
226              LOG.trace("Skipping fresh file: {}. Creation time={}", p,
227                fs.getFileStatus(p).getModificationTime());
228            }
229
230          }
231          LOG.info(" MOB Cleaner found {} files to archive for table={} family={}",
232            toArchive.size(), htd.getTableName().getNameAsString(), family);
233          archiveMobFiles(rs.getConfiguration(), htd.getTableName(), family.getBytes(), toArchive);
234          LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(),
235            htd.getTableName().getNameAsString(), family);
236        }
237
238        LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
239
240      }
241    } catch (IOException e) {
242      LOG.error("MOB Cleaner failed when trying to access the file system", e);
243    }
244  }
245
246  /**
247   * Archives the mob files.
248   * @param conf       The current configuration.
249   * @param tableName  The table name.
250   * @param family     The name of the column family.
251   * @param storeFiles The files to be archived.
252   * @throws IOException exception
253   */
254  public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family,
255    List<Path> storeFiles) throws IOException {
256
257    if (storeFiles.size() == 0) {
258      // nothing to remove
259      LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName,
260        Bytes.toString(family));
261      return;
262    }
263    Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
264    FileSystem fs = storeFiles.get(0).getFileSystem(conf);
265
266    for (Path p : storeFiles) {
267      LOG.debug("MOB Cleaner is archiving: {}", p);
268      HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir,
269        family, p);
270    }
271  }
272}