001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicBoolean; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.MemoryCompactionPolicy; 028import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.util.ClassSize; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * The ongoing MemStore Compaction manager, dispatches a solo running compaction and interrupts 037 * the compaction if requested. The compaction is interrupted and stopped by CompactingMemStore, 038 * for example when another compaction needs to be started. 039 * Prior to compaction the MemStoreCompactor evaluates 040 * the compacting ratio and aborts the compaction if it is not worthy. 041 * The MemStoreScanner is used to traverse the compaction pipeline. The MemStoreScanner 042 * is included in internal store scanner, where all compaction logic is implemented. 043 * Threads safety: It is assumed that the compaction pipeline is immutable, 044 * therefore no special synchronization is required. 045 */ 046@InterfaceAudience.Private 047public class MemStoreCompactor { 048 049 public static final long DEEP_OVERHEAD = ClassSize 050 .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE 051 // compactingMemStore, versionedList, isInterrupted, strategy (the reference) 052 // "action" is an enum and thus it is a class with static final constants, 053 // so counting only the size of the reference to it and not the size of the internals 054 + Bytes.SIZEOF_INT // compactionKVMax 055 + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals) 056 ); 057 058 private static final Logger LOG = LoggerFactory.getLogger(MemStoreCompactor.class); 059 private CompactingMemStore compactingMemStore; 060 061 // a static version of the segment list from the pipeline 062 private VersionedSegmentsList versionedList; 063 064 // a flag raised when compaction is requested to stop 065 private final AtomicBoolean isInterrupted = new AtomicBoolean(false); 066 067 // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator 068 private final int compactionKVMax; 069 070 private MemStoreCompactionStrategy strategy; 071 072 public MemStoreCompactor(CompactingMemStore compactingMemStore, 073 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 074 this.compactingMemStore = compactingMemStore; 075 this.compactionKVMax = compactingMemStore.getConfiguration() 076 .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 077 initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(), 078 compactingMemStore.getFamilyName()); 079 } 080 081 @Override 082 public String toString() { 083 return this.strategy + ", compactionCellMax=" + this.compactionKVMax; 084 } 085 086 /**---------------------------------------------------------------------- 087 * The request to dispatch the compaction asynchronous task. 088 * The method returns true if compaction was successfully dispatched, or false if there 089 * is already an ongoing compaction or no segments to compact. 090 */ 091 public boolean start() throws IOException { 092 if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline 093 return false; 094 } 095 096 // get a snapshot of the list of the segments from the pipeline, 097 // this local copy of the list is marked with specific version 098 versionedList = compactingMemStore.getImmutableSegments(); 099 LOG.trace("Speculative compaction starting on {}/{}", 100 compactingMemStore.getStore().getHRegion().getRegionInfo().getEncodedName(), 101 compactingMemStore.getStore().getColumnFamilyName()); 102 HStore store = compactingMemStore.getStore(); 103 RegionCoprocessorHost cpHost = store.getCoprocessorHost(); 104 if (cpHost != null) { 105 cpHost.preMemStoreCompaction(store); 106 } 107 try { 108 doCompaction(); 109 } finally { 110 if (cpHost != null) { 111 cpHost.postMemStoreCompaction(store); 112 } 113 } 114 return true; 115 } 116 117 /**---------------------------------------------------------------------- 118 * The request to cancel the compaction asynchronous task 119 * The compaction may still happen if the request was sent too late 120 * Non-blocking request 121 */ 122 public void stop() { 123 isInterrupted.compareAndSet(false, true); 124 } 125 126 127 public void resetStats() { 128 strategy.resetStats(); 129 } 130 131 /**---------------------------------------------------------------------- 132 * Reset the interruption indicator and clear the pointers in order to allow good 133 * garbage collection 134 */ 135 private void releaseResources() { 136 isInterrupted.set(false); 137 versionedList = null; 138 } 139 140 /**---------------------------------------------------------------------- 141 * The worker thread performs the compaction asynchronously. 142 * The solo (per compactor) thread only reads the compaction pipeline. 143 * There is at most one thread per memstore instance. 144 */ 145 private void doCompaction() { 146 ImmutableSegment result = null; 147 boolean resultSwapped = false; 148 MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList); 149 boolean merge = (nextStep == MemStoreCompactionStrategy.Action.MERGE || 150 nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS); 151 try { 152 if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening 153 return; // the compaction also doesn't start when interrupted 154 } 155 156 if (nextStep == MemStoreCompactionStrategy.Action.NOOP) { 157 return; 158 } 159 if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN 160 || nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) { 161 // some Segment in the pipeline is with SkipList index, make it flat 162 compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep); 163 return; 164 } 165 166 // Create one segment representing all segments in the compaction pipeline, 167 // either by compaction or by merge 168 if (!isInterrupted.get()) { 169 result = createSubstitution(nextStep); 170 } 171 172 // Substitute the pipeline with one segment 173 if (!isInterrupted.get()) { 174 resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge); 175 if (resultSwapped) { 176 // update compaction strategy 177 strategy.updateStats(result); 178 // update the wal so it can be truncated and not get too long 179 compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater 180 } 181 } 182 } catch (IOException e) { 183 LOG.trace("Interrupting in-memory compaction for store={}", 184 compactingMemStore.getFamilyName()); 185 Thread.currentThread().interrupt(); 186 } finally { 187 // For the MERGE case, if the result was created, but swap didn't happen, 188 // we DON'T need to close the result segment (meaning its MSLAB)! 189 // Because closing the result segment means closing the chunks of all segments 190 // in the compaction pipeline, which still have ongoing scans. 191 if (!merge && (result != null) && !resultSwapped) { 192 result.close(); 193 } 194 releaseResources(); 195 compactingMemStore.setInMemoryCompactionCompleted(); 196 } 197 198 } 199 200 /**---------------------------------------------------------------------- 201 * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the 202 * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned. 203 */ 204 private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) throws 205 IOException { 206 207 ImmutableSegment result = null; 208 MemStoreSegmentsIterator iterator = null; 209 List<ImmutableSegment> segments = versionedList.getStoreSegments(); 210 for (ImmutableSegment s : segments) { 211 s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed 212 } 213 214 switch (action) { 215 case COMPACT: 216 iterator = new MemStoreCompactorSegmentsIterator(segments, 217 compactingMemStore.getComparator(), 218 compactionKVMax, compactingMemStore.getStore()); 219 220 result = SegmentFactory.instance().createImmutableSegmentByCompaction( 221 compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, 222 versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action); 223 iterator.close(); 224 break; 225 case MERGE: 226 case MERGE_COUNT_UNIQUE_KEYS: 227 iterator = 228 new MemStoreMergerSegmentsIterator(segments, 229 compactingMemStore.getComparator(), compactionKVMax); 230 231 result = SegmentFactory.instance().createImmutableSegmentByMerge( 232 compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, 233 versionedList.getNumOfCells(), segments, compactingMemStore.getIndexType(), action); 234 iterator.close(); 235 break; 236 default: 237 throw new RuntimeException("Unknown action " + action); // sanity check 238 } 239 240 return result; 241 } 242 243 void initiateCompactionStrategy(MemoryCompactionPolicy compType, 244 Configuration configuration, String cfName) throws IllegalArgumentIOException { 245 246 assert (compType !=MemoryCompactionPolicy.NONE); 247 248 switch (compType){ 249 case BASIC: strategy = new BasicMemStoreCompactionStrategy(configuration, cfName); 250 break; 251 case EAGER: strategy = new EagerMemStoreCompactionStrategy(configuration, cfName); 252 break; 253 case ADAPTIVE: strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName); 254 break; 255 default: 256 // sanity check 257 throw new IllegalArgumentIOException("Unknown memory compaction type " + compType); 258 } 259 } 260}