001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver.compactions; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.regionserver.HStoreFile; 028import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 029import org.apache.hadoop.hbase.regionserver.StoreUtils; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * 037 * FIFO compaction policy selects only files which have all cells expired. 038 * The column family MUST have non-default TTL. One of the use cases for this 039 * policy is when we need to store raw data which will be post-processed later 040 * and discarded completely after quite short period of time. Raw time-series vs. 041 * time-based roll up aggregates and compacted time-series. We collect raw time-series 042 * and store them into CF with FIFO compaction policy, periodically we run task 043 * which creates roll up aggregates and compacts time-series, the original raw data 044 * can be discarded after that. 045 * 046 */ 047@InterfaceAudience.Private 048public class FIFOCompactionPolicy extends ExploringCompactionPolicy { 049 050 private static final Logger LOG = LoggerFactory.getLogger(FIFOCompactionPolicy.class); 051 052 053 public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { 054 super(conf, storeConfigInfo); 055 } 056 057 @Override 058 public CompactionRequestImpl selectCompaction(Collection<HStoreFile> candidateFiles, 059 List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, 060 boolean forceMajor) throws IOException { 061 if(forceMajor){ 062 LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag."); 063 } 064 boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles); 065 if(isAfterSplit){ 066 LOG.info("Split detected, delegate selection to the parent policy."); 067 return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction, 068 mayUseOffPeak, forceMajor); 069 } 070 071 // Nothing to compact 072 Collection<HStoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting); 073 CompactionRequestImpl result = new CompactionRequestImpl(toCompact); 074 return result; 075 } 076 077 @Override 078 public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) 079 throws IOException { 080 boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact); 081 if(isAfterSplit){ 082 LOG.info("Split detected, delegate to the parent policy."); 083 return super.shouldPerformMajorCompaction(filesToCompact); 084 } 085 return false; 086 } 087 088 @Override 089 public boolean needsCompaction(Collection<HStoreFile> storeFiles, 090 List<HStoreFile> filesCompacting) { 091 boolean isAfterSplit = StoreUtils.hasReferences(storeFiles); 092 if(isAfterSplit){ 093 LOG.info("Split detected, delegate to the parent policy."); 094 return super.needsCompaction(storeFiles, filesCompacting); 095 } 096 return hasExpiredStores(storeFiles); 097 } 098 099 /** 100 * The FIFOCompactionPolicy only choose those TTL expired HFiles as the compaction candidates. So 101 * if all HFiles are TTL expired, then the compaction will generate a new empty HFile. While its 102 * max timestamp will be Long.MAX_VALUE. If not considered separately, the HFile will never be 103 * archived because its TTL will be never expired. So we'll check the empty store file separately. 104 * (See HBASE-21504) 105 */ 106 private boolean isEmptyStoreFile(HStoreFile sf) { 107 return sf.getReader().getEntries() == 0; 108 } 109 110 private boolean hasExpiredStores(Collection<HStoreFile> files) { 111 long currentTime = EnvironmentEdgeManager.currentTime(); 112 for (HStoreFile sf : files) { 113 if (isEmptyStoreFile(sf)) { 114 return true; 115 } 116 // Check MIN_VERSIONS is in HStore removeUnneededFiles 117 long maxTs = sf.getReader().getMaxTimestamp(); 118 long maxTtl = storeConfigInfo.getStoreFileTtl(); 119 if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) { 120 continue; 121 } else { 122 return true; 123 } 124 } 125 return false; 126 } 127 128 private Collection<HStoreFile> getExpiredStores(Collection<HStoreFile> files, 129 Collection<HStoreFile> filesCompacting) { 130 long currentTime = EnvironmentEdgeManager.currentTime(); 131 Collection<HStoreFile> expiredStores = new ArrayList<>(); 132 for (HStoreFile sf : files) { 133 if (isEmptyStoreFile(sf)) { 134 expiredStores.add(sf); 135 continue; 136 } 137 // Check MIN_VERSIONS is in HStore removeUnneededFiles 138 long maxTs = sf.getReader().getMaxTimestamp(); 139 long maxTtl = storeConfigInfo.getStoreFileTtl(); 140 if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) { 141 continue; 142 } else if (filesCompacting == null || !filesCompacting.contains(sf)) { 143 expiredStores.add(sf); 144 } 145 } 146 return expiredStores; 147 } 148}