001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.util.concurrent.atomic.AtomicBoolean; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.MemoryCompactionPolicy; 027import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.hbase.util.ClassSize; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 035 036/** 037 * The ongoing MemStore Compaction manager, dispatches a solo running compaction and interrupts 038 * the compaction if requested. The compaction is interrupted and stopped by CompactingMemStore, 039 * for example when another compaction needs to be started. 040 * Prior to compaction the MemStoreCompactor evaluates 041 * the compacting ratio and aborts the compaction if it is not worthy. 042 * The MemStoreScanner is used to traverse the compaction pipeline. The MemStoreScanner 043 * is included in internal store scanner, where all compaction logic is implemented. 044 * Threads safety: It is assumed that the compaction pipeline is immutable, 045 * therefore no special synchronization is required. 046 */ 047@InterfaceAudience.Private 048public class MemStoreCompactor { 049 050 public static final long DEEP_OVERHEAD = ClassSize 051 .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE 052 // compactingMemStore, versionedList, isInterrupted, strategy (the reference) 053 // "action" is an enum and thus it is a class with static final constants, 054 // so counting only the size of the reference to it and not the size of the internals 055 + Bytes.SIZEOF_INT // compactionKVMax 056 + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals) 057 ); 058 059 private static final Logger LOG = LoggerFactory.getLogger(MemStoreCompactor.class); 060 private CompactingMemStore compactingMemStore; 061 062 // a static version of the segment list from the pipeline 063 private VersionedSegmentsList versionedList; 064 065 // a flag raised when compaction is requested to stop 066 private final AtomicBoolean isInterrupted = new AtomicBoolean(false); 067 068 // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator 069 private final int compactionKVMax; 070 071 private MemStoreCompactionStrategy strategy; 072 073 public MemStoreCompactor(CompactingMemStore compactingMemStore, 074 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 075 this.compactingMemStore = compactingMemStore; 076 this.compactionKVMax = compactingMemStore.getConfiguration() 077 .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 078 initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(), 079 compactingMemStore.getFamilyName()); 080 } 081 082 @Override 083 public String toString() { 084 return this.strategy + ", compactionCellMax=" + this.compactionKVMax; 085 } 086 087 /**---------------------------------------------------------------------- 088 * The request to dispatch the compaction asynchronous task. 089 * The method returns true if compaction was successfully dispatched, or false if there 090 * is already an ongoing compaction or no segments to compact. 091 */ 092 public boolean start() throws IOException { 093 if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline 094 return false; 095 } 096 097 // get a snapshot of the list of the segments from the pipeline, 098 // this local copy of the list is marked with specific version 099 versionedList = compactingMemStore.getImmutableSegments(); 100 LOG.trace("Speculative compaction starting on {}/{}", 101 compactingMemStore.getStore().getHRegion().getRegionInfo().getEncodedName(), 102 compactingMemStore.getStore().getColumnFamilyName()); 103 HStore store = compactingMemStore.getStore(); 104 RegionCoprocessorHost cpHost = store.getCoprocessorHost(); 105 if (cpHost != null) { 106 cpHost.preMemStoreCompaction(store); 107 } 108 try { 109 doCompaction(); 110 } finally { 111 if (cpHost != null) { 112 cpHost.postMemStoreCompaction(store); 113 } 114 } 115 return true; 116 } 117 118 /**---------------------------------------------------------------------- 119 * The request to cancel the compaction asynchronous task 120 * The compaction may still happen if the request was sent too late 121 * Non-blocking request 122 */ 123 public void stop() { 124 isInterrupted.compareAndSet(false, true); 125 } 126 127 128 public void resetStats() { 129 strategy.resetStats(); 130 } 131 132 /**---------------------------------------------------------------------- 133 * Reset the interruption indicator and clear the pointers in order to allow good 134 * garbage collection 135 */ 136 private void releaseResources() { 137 isInterrupted.set(false); 138 versionedList = null; 139 } 140 141 /**---------------------------------------------------------------------- 142 * The worker thread performs the compaction asynchronously. 143 * The solo (per compactor) thread only reads the compaction pipeline. 144 * There is at most one thread per memstore instance. 145 */ 146 private void doCompaction() { 147 ImmutableSegment result = null; 148 boolean resultSwapped = false; 149 if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening 150 return; // the compaction also doesn't start when interrupted 151 } 152 153 MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList); 154 boolean merge = 155 (nextStep == MemStoreCompactionStrategy.Action.MERGE || 156 nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS); 157 try { 158 if (nextStep == MemStoreCompactionStrategy.Action.NOOP) { 159 return; 160 } 161 if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN || 162 nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) { 163 // some Segment in the pipeline is with SkipList index, make it flat 164 compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep); 165 return; 166 } 167 168 // Create one segment representing all segments in the compaction pipeline, 169 // either by compaction or by merge 170 if (!isInterrupted.get()) { 171 result = createSubstitution(nextStep); 172 } 173 174 // Substitute the pipeline with one segment 175 if (!isInterrupted.get()) { 176 resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge); 177 if (resultSwapped) { 178 // update compaction strategy 179 strategy.updateStats(result); 180 // update the wal so it can be truncated and not get too long 181 compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater 182 } 183 } 184 } catch (IOException e) { 185 LOG.trace("Interrupting in-memory compaction for store={}", 186 compactingMemStore.getFamilyName()); 187 Thread.currentThread().interrupt(); 188 } finally { 189 // For the MERGE case, if the result was created, but swap didn't happen, 190 // we DON'T need to close the result segment (meaning its MSLAB)! 191 // Because closing the result segment means closing the chunks of all segments 192 // in the compaction pipeline, which still have ongoing scans. 193 if (!merge && (result != null) && !resultSwapped) { 194 result.close(); 195 } 196 releaseResources(); 197 } 198 199 } 200 201 /**---------------------------------------------------------------------- 202 * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the 203 * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned. 204 */ 205 private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) throws 206 IOException { 207 208 ImmutableSegment result = null; 209 MemStoreSegmentsIterator iterator = null; 210 211 switch (action) { 212 case COMPACT: 213 iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(), 214 compactingMemStore.getComparator(), 215 compactionKVMax, compactingMemStore.getStore()); 216 217 result = SegmentFactory.instance().createImmutableSegmentByCompaction( 218 compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, 219 versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action); 220 iterator.close(); 221 break; 222 case MERGE: 223 case MERGE_COUNT_UNIQUE_KEYS: 224 iterator = 225 new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), 226 compactingMemStore.getComparator(), compactionKVMax); 227 228 result = SegmentFactory.instance().createImmutableSegmentByMerge( 229 compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, 230 versionedList.getNumOfCells(), versionedList.getStoreSegments(), 231 compactingMemStore.getIndexType(), action); 232 iterator.close(); 233 break; 234 default: 235 throw new RuntimeException("Unknown action " + action); // sanity check 236 } 237 238 return result; 239 } 240 241 @VisibleForTesting 242 void initiateCompactionStrategy(MemoryCompactionPolicy compType, 243 Configuration configuration, String cfName) throws IllegalArgumentIOException { 244 245 assert (compType !=MemoryCompactionPolicy.NONE); 246 247 switch (compType){ 248 case BASIC: strategy = new BasicMemStoreCompactionStrategy(configuration, cfName); 249 break; 250 case EAGER: strategy = new EagerMemStoreCompactionStrategy(configuration, cfName); 251 break; 252 case ADAPTIVE: strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName); 253 break; 254 default: 255 // sanity check 256 throw new IllegalArgumentIOException("Unknown memory compaction type " + compType); 257 } 258 } 259}