001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.TimeUnit; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.LocatedFileStatus; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.fs.RemoteIterator; 034import org.apache.hadoop.hbase.ScheduledChore; 035import org.apache.hadoop.hbase.TableDescriptors; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.backup.HFileArchiver; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.ConnectionFactory; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.io.hfile.CacheConfig; 044import org.apache.hadoop.hbase.master.HMaster; 045import org.apache.hadoop.hbase.regionserver.BloomType; 046import org.apache.hadoop.hbase.regionserver.HStoreFile; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.CommonFSUtils; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.hadoop.hbase.util.FSUtils; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; 056 057/** 058 * The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete 059 * (files which have no active references to) mob files. 060 */ 061@InterfaceAudience.Private 062public class MobFileCleanerChore extends ScheduledChore { 063 064 private static final Logger LOG = LoggerFactory.getLogger(MobFileCleanerChore.class); 065 private final HMaster master; 066 private ExpiredMobFileCleaner cleaner; 067 068 static { 069 Configuration.addDeprecation(MobConstants.DEPRECATED_MOB_CLEANER_PERIOD, 070 MobConstants.MOB_CLEANER_PERIOD); 071 } 072 073 public MobFileCleanerChore(HMaster master) { 074 super(master.getServerName() + "-MobFileCleanerChore", master, 075 master.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, 076 MobConstants.DEFAULT_MOB_CLEANER_PERIOD), 077 master.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD, 078 MobConstants.DEFAULT_MOB_CLEANER_PERIOD), 079 TimeUnit.SECONDS); 080 this.master = master; 081 cleaner = new ExpiredMobFileCleaner(); 082 cleaner.setConf(master.getConfiguration()); 083 checkObsoleteConfigurations(); 084 } 085 086 private void checkObsoleteConfigurations() { 087 Configuration conf = master.getConfiguration(); 088 089 if (conf.get("hbase.mob.compaction.mergeable.threshold") != null) { 090 LOG.warn("'hbase.mob.compaction.mergeable.threshold' is obsolete and not used anymore."); 091 } 092 if (conf.get("hbase.mob.delfile.max.count") != null) { 093 LOG.warn("'hbase.mob.delfile.max.count' is obsolete and not used anymore."); 094 } 095 if (conf.get("hbase.mob.compaction.threads.max") != null) { 096 LOG.warn("'hbase.mob.compaction.threads.max' is obsolete and not used anymore."); 097 } 098 if (conf.get("hbase.mob.compaction.batch.size") != null) { 099 LOG.warn("'hbase.mob.compaction.batch.size' is obsolete and not used anymore."); 100 } 101 } 102 103 @RestrictedApi(explanation = "Should only be called in tests", link = "", 104 allowedOnPath = ".*/src/test/.*") 105 public MobFileCleanerChore() { 106 this.master = null; 107 } 108 109 @Override 110 protected void chore() { 111 TableDescriptors htds = master.getTableDescriptors(); 112 113 Map<String, TableDescriptor> map = null; 114 try { 115 map = htds.getAll(); 116 } catch (IOException e) { 117 LOG.error("MobFileCleanerChore failed", e); 118 return; 119 } 120 for (TableDescriptor htd : map.values()) { 121 for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { 122 if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) { 123 try { 124 cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd); 125 } catch (IOException e) { 126 LOG.error("Failed to clean the expired mob files table={} family={}", 127 htd.getTableName().getNameAsString(), hcd.getNameAsString(), e); 128 } 129 } 130 } 131 try { 132 // Now clean obsolete files for a table 133 LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName()); 134 cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName()); 135 LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName()); 136 } catch (IOException e) { 137 LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e); 138 } 139 } 140 } 141 142 /** 143 * Performs housekeeping file cleaning (called by MOB Cleaner chore) 144 * @param conf configuration 145 * @param table table name 146 * @throws IOException exception 147 */ 148 public void cleanupObsoleteMobFiles(Configuration conf, TableName table) throws IOException { 149 150 long minAgeToArchive = 151 conf.getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE); 152 // We check only those MOB files, which creation time is less 153 // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap 154 // gives us full confidence that all corresponding store files will 155 // exist at the time cleaning procedure begins and will be examined. 156 // So, if MOB file creation time is greater than this maxTimeToArchive, 157 // this will be skipped and won't be archived. 158 long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() - minAgeToArchive; 159 try (final Connection conn = ConnectionFactory.createConnection(conf); 160 final Admin admin = conn.getAdmin();) { 161 TableDescriptor htd = admin.getDescriptor(table); 162 List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd); 163 if (list.size() == 0) { 164 LOG.info("Skipping non-MOB table [{}]", table); 165 return; 166 } else { 167 LOG.info("Only MOB files whose creation time older than {} will be archived, table={}", 168 maxCreationTimeToArchive, table); 169 } 170 171 FileSystem fs = FileSystem.get(conf); 172 Set<String> regionNames = new HashSet<>(); 173 Path rootDir = CommonFSUtils.getRootDir(conf); 174 Path tableDir = CommonFSUtils.getTableDir(rootDir, table); 175 List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir); 176 177 Set<String> allActiveMobFileName = new HashSet<String>(); 178 for (Path regionPath : regionDirs) { 179 regionNames.add(regionPath.getName()); 180 for (ColumnFamilyDescriptor hcd : list) { 181 String family = hcd.getNameAsString(); 182 Path storePath = new Path(regionPath, family); 183 boolean succeed = false; 184 Set<String> regionMobs = new HashSet<String>(); 185 186 while (!succeed) { 187 if (!fs.exists(storePath)) { 188 String errMsg = String.format("Directory %s was deleted during MOB file cleaner chore" 189 + " execution, aborting MOB file cleaner chore.", storePath); 190 throw new IOException(errMsg); 191 } 192 RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(storePath); 193 List<Path> storeFiles = new ArrayList<Path>(); 194 // Load list of store files first 195 while (rit.hasNext()) { 196 Path p = rit.next().getPath(); 197 if (fs.isFile(p)) { 198 storeFiles.add(p); 199 } 200 } 201 LOG.info("Found {} store files in: {}", storeFiles.size(), storePath); 202 Path currentPath = null; 203 try { 204 for (Path pp : storeFiles) { 205 currentPath = pp; 206 LOG.trace("Store file: {}", pp); 207 HStoreFile sf = null; 208 byte[] mobRefData = null; 209 byte[] bulkloadMarkerData = null; 210 try { 211 sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED, BloomType.NONE, true); 212 sf.initReader(); 213 mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); 214 bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); 215 // close store file to avoid memory leaks 216 sf.closeStoreFile(true); 217 } catch (IOException ex) { 218 // When FileBased SFT is active the store dir can contain corrupted or incomplete 219 // files. So read errors are expected. We just skip these files. 220 if (ex instanceof FileNotFoundException) { 221 throw ex; 222 } 223 LOG.debug("Failed to get mob data from file: {} due to error.", pp.toString(), 224 ex); 225 continue; 226 } 227 if (mobRefData == null) { 228 if (bulkloadMarkerData == null) { 229 LOG.warn("Found old store file with no MOB_FILE_REFS: {} - " 230 + "can not proceed until all old files will be MOB-compacted.", pp); 231 return; 232 } else { 233 LOG.debug("Skipping file without MOB references (bulkloaded file):{}", pp); 234 continue; 235 } 236 } 237 // file may or may not have MOB references, but was created by the distributed 238 // mob compaction code. 239 try { 240 SetMultimap<TableName, String> mobs = 241 MobUtils.deserializeMobFileRefs(mobRefData).build(); 242 LOG.debug("Found {} mob references for store={}", mobs.size(), sf); 243 LOG.trace("Specific mob references found for store={} : {}", sf, mobs); 244 regionMobs.addAll(mobs.values()); 245 } catch (RuntimeException exception) { 246 throw new IOException("failure getting mob references for hfile " + sf, 247 exception); 248 } 249 } 250 } catch (FileNotFoundException e) { 251 LOG.warn( 252 "Missing file:{} Starting MOB cleaning cycle from the beginning" + " due to error", 253 currentPath, e); 254 regionMobs.clear(); 255 continue; 256 } 257 succeed = true; 258 } 259 260 // Add MOB references for current region/family 261 allActiveMobFileName.addAll(regionMobs); 262 } // END column families 263 } // END regions 264 // Check if number of MOB files too big (over 1M) 265 if (allActiveMobFileName.size() > 1000000) { 266 LOG.warn("Found too many active MOB files: {}, table={}, " 267 + "this may result in high memory pressure.", allActiveMobFileName.size(), table); 268 } 269 LOG.debug("Found: {} active mob refs for table={}", allActiveMobFileName.size(), table); 270 allActiveMobFileName.stream().forEach(LOG::trace); 271 272 // Now scan MOB directories and find MOB files with no references to them 273 for (ColumnFamilyDescriptor hcd : list) { 274 List<Path> toArchive = new ArrayList<Path>(); 275 String family = hcd.getNameAsString(); 276 Path dir = MobUtils.getMobFamilyPath(conf, table, family); 277 RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir); 278 while (rit.hasNext()) { 279 LocatedFileStatus lfs = rit.next(); 280 Path p = lfs.getPath(); 281 String[] mobParts = p.getName().split("_"); 282 String regionName = mobParts[mobParts.length - 1]; 283 284 if (!regionNames.contains(regionName)) { 285 // MOB belonged to a region no longer hosted 286 long creationTime = fs.getFileStatus(p).getModificationTime(); 287 if (creationTime < maxCreationTimeToArchive) { 288 LOG.trace("Archiving MOB file {} creation time={}", p, 289 (fs.getFileStatus(p).getModificationTime())); 290 toArchive.add(p); 291 } else { 292 LOG.trace("Skipping fresh file: {}. Creation time={}", p, 293 fs.getFileStatus(p).getModificationTime()); 294 } 295 } else { 296 LOG.trace("Keeping MOB file with existing region: {}", p); 297 } 298 } 299 LOG.info(" MOB Cleaner found {} files to archive for table={} family={}", toArchive.size(), 300 table, family); 301 archiveMobFiles(conf, table, family.getBytes(), toArchive); 302 LOG.info(" MOB Cleaner archived {} files, table={} family={}", toArchive.size(), table, 303 family); 304 } 305 } 306 } 307 308 /** 309 * Archives the mob files. 310 * @param conf The current configuration. 311 * @param tableName The table name. 312 * @param family The name of the column family. 313 * @param storeFiles The files to be archived. 314 * @throws IOException exception 315 */ 316 public void archiveMobFiles(Configuration conf, TableName tableName, byte[] family, 317 List<Path> storeFiles) throws IOException { 318 319 if (storeFiles.size() == 0) { 320 // nothing to remove 321 LOG.debug("Skipping archiving old MOB files - no files found for table={} cf={}", tableName, 322 Bytes.toString(family)); 323 return; 324 } 325 Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), tableName); 326 FileSystem fs = storeFiles.get(0).getFileSystem(conf); 327 328 for (Path p : storeFiles) { 329 LOG.debug("MOB Cleaner is archiving: {}", p); 330 HFileArchiver.archiveStoreFile(conf, fs, MobUtils.getMobRegionInfo(tableName), mobTableDir, 331 family, p); 332 } 333 } 334}