001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.compactions; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.regionserver.HStoreFile; 026import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; 027import org.apache.hadoop.hbase.regionserver.StoreUtils; 028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * FIFO compaction policy selects only files which have all cells expired. The column family MUST 035 * have non-default TTL. One of the use cases for this policy is when we need to store raw data 036 * which will be post-processed later and discarded completely after quite short period of time. Raw 037 * time-series vs. time-based roll up aggregates and compacted time-series. We collect raw 038 * time-series and store them into CF with FIFO compaction policy, periodically we run task which 039 * creates roll up aggregates and compacts time-series, the original raw data can be discarded after 040 * that. 041 */ 042@InterfaceAudience.Private 043public class FIFOCompactionPolicy extends ExploringCompactionPolicy { 044 045 private static final Logger LOG = LoggerFactory.getLogger(FIFOCompactionPolicy.class); 046 047 public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { 048 super(conf, storeConfigInfo); 049 } 050 051 @Override 052 public CompactionRequestImpl selectCompaction(Collection<HStoreFile> candidateFiles, 053 List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, 054 boolean forceMajor) throws IOException { 055 if (forceMajor) { 056 LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag."); 057 } 058 boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles); 059 if (isAfterSplit) { 060 LOG.info("Split detected, delegate selection to the parent policy."); 061 return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction, 062 mayUseOffPeak, forceMajor); 063 } 064 065 // Nothing to compact 066 Collection<HStoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting); 067 CompactionRequestImpl result = new CompactionRequestImpl(toCompact); 068 return result; 069 } 070 071 @Override 072 public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact) 073 throws IOException { 074 boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact); 075 if (isAfterSplit) { 076 LOG.info("Split detected, delegate to the parent policy."); 077 return super.shouldPerformMajorCompaction(filesToCompact); 078 } 079 return false; 080 } 081 082 @Override 083 public boolean needsCompaction(Collection<HStoreFile> storeFiles, 084 List<HStoreFile> filesCompacting) { 085 boolean isAfterSplit = StoreUtils.hasReferences(storeFiles); 086 if (isAfterSplit) { 087 LOG.info("Split detected, delegate to the parent policy."); 088 return super.needsCompaction(storeFiles, filesCompacting); 089 } 090 return hasExpiredStores(storeFiles); 091 } 092 093 /** 094 * The FIFOCompactionPolicy only choose the TTL expired store files as the compaction candidates. 095 * If all the store files are TTL expired, then the compaction will generate a new empty file. 096 * While its max timestamp will be Long.MAX_VALUE. If not considered separately, the store file 097 * will never be archived because its TTL will be never expired. So we'll check the empty store 098 * file separately (See HBASE-21504). 099 */ 100 private boolean isEmptyStoreFile(HStoreFile sf) { 101 return sf.getReader().getEntries() == 0; 102 } 103 104 private boolean hasExpiredStores(Collection<HStoreFile> files) { 105 long currentTime = EnvironmentEdgeManager.currentTime(); 106 for (HStoreFile sf : files) { 107 if (isEmptyStoreFile(sf)) { 108 return true; 109 } 110 // Check MIN_VERSIONS is in HStore removeUnneededFiles 111 long maxTs = sf.getReader().getMaxTimestamp(); 112 long maxTtl = storeConfigInfo.getStoreFileTtl(); 113 if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) { 114 continue; 115 } else { 116 return true; 117 } 118 } 119 return false; 120 } 121 122 private Collection<HStoreFile> getExpiredStores(Collection<HStoreFile> files, 123 Collection<HStoreFile> filesCompacting) { 124 long currentTime = EnvironmentEdgeManager.currentTime(); 125 Collection<HStoreFile> expiredStores = new ArrayList<>(); 126 for (HStoreFile sf : files) { 127 if (isEmptyStoreFile(sf) && !filesCompacting.contains(sf)) { 128 expiredStores.add(sf); 129 continue; 130 } 131 // Check MIN_VERSIONS is in HStore removeUnneededFiles 132 long maxTs = sf.getReader().getMaxTimestamp(); 133 long maxTtl = storeConfigInfo.getStoreFileTtl(); 134 if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) { 135 continue; 136 } else if (filesCompacting == null || !filesCompacting.contains(sf)) { 137 expiredStores.add(sf); 138 } 139 } 140 return expiredStores; 141 } 142}