001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicBoolean; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.MemoryCompactionPolicy; 028import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.hbase.util.ClassSize; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 036 037/** 038 * The ongoing MemStore Compaction manager, dispatches a solo running compaction and interrupts 039 * the compaction if requested. The compaction is interrupted and stopped by CompactingMemStore, 040 * for example when another compaction needs to be started. 041 * Prior to compaction the MemStoreCompactor evaluates 042 * the compacting ratio and aborts the compaction if it is not worthy. 043 * The MemStoreScanner is used to traverse the compaction pipeline. The MemStoreScanner 044 * is included in internal store scanner, where all compaction logic is implemented. 045 * Threads safety: It is assumed that the compaction pipeline is immutable, 046 * therefore no special synchronization is required. 047 */ 048@InterfaceAudience.Private 049public class MemStoreCompactor { 050 051 public static final long DEEP_OVERHEAD = ClassSize 052 .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE 053 // compactingMemStore, versionedList, isInterrupted, strategy (the reference) 054 // "action" is an enum and thus it is a class with static final constants, 055 // so counting only the size of the reference to it and not the size of the internals 056 + Bytes.SIZEOF_INT // compactionKVMax 057 + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals) 058 ); 059 060 private static final Logger LOG = LoggerFactory.getLogger(MemStoreCompactor.class); 061 private CompactingMemStore compactingMemStore; 062 063 // a static version of the segment list from the pipeline 064 private VersionedSegmentsList versionedList; 065 066 // a flag raised when compaction is requested to stop 067 private final AtomicBoolean isInterrupted = new AtomicBoolean(false); 068 069 // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator 070 private final int compactionKVMax; 071 072 private MemStoreCompactionStrategy strategy; 073 074 public MemStoreCompactor(CompactingMemStore compactingMemStore, 075 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 076 this.compactingMemStore = compactingMemStore; 077 this.compactionKVMax = compactingMemStore.getConfiguration() 078 .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 079 initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(), 080 compactingMemStore.getFamilyName()); 081 } 082 083 @Override 084 public String toString() { 085 return this.strategy + ", compactionCellMax=" + this.compactionKVMax; 086 } 087 088 /**---------------------------------------------------------------------- 089 * The request to dispatch the compaction asynchronous task. 090 * The method returns true if compaction was successfully dispatched, or false if there 091 * is already an ongoing compaction or no segments to compact. 092 */ 093 public boolean start() throws IOException { 094 if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline 095 return false; 096 } 097 098 // get a snapshot of the list of the segments from the pipeline, 099 // this local copy of the list is marked with specific version 100 versionedList = compactingMemStore.getImmutableSegments(); 101 LOG.trace("Speculative compaction starting on {}/{}", 102 compactingMemStore.getStore().getHRegion().getRegionInfo().getEncodedName(), 103 compactingMemStore.getStore().getColumnFamilyName()); 104 HStore store = compactingMemStore.getStore(); 105 RegionCoprocessorHost cpHost = store.getCoprocessorHost(); 106 if (cpHost != null) { 107 cpHost.preMemStoreCompaction(store); 108 } 109 try { 110 doCompaction(); 111 } finally { 112 if (cpHost != null) { 113 cpHost.postMemStoreCompaction(store); 114 } 115 } 116 return true; 117 } 118 119 /**---------------------------------------------------------------------- 120 * The request to cancel the compaction asynchronous task 121 * The compaction may still happen if the request was sent too late 122 * Non-blocking request 123 */ 124 public void stop() { 125 isInterrupted.compareAndSet(false, true); 126 } 127 128 129 public void resetStats() { 130 strategy.resetStats(); 131 } 132 133 /**---------------------------------------------------------------------- 134 * Reset the interruption indicator and clear the pointers in order to allow good 135 * garbage collection 136 */ 137 private void releaseResources() { 138 isInterrupted.set(false); 139 versionedList = null; 140 } 141 142 /**---------------------------------------------------------------------- 143 * The worker thread performs the compaction asynchronously. 144 * The solo (per compactor) thread only reads the compaction pipeline. 145 * There is at most one thread per memstore instance. 146 */ 147 private void doCompaction() { 148 ImmutableSegment result = null; 149 boolean resultSwapped = false; 150 MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList); 151 boolean merge = (nextStep == MemStoreCompactionStrategy.Action.MERGE || 152 nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS); 153 try { 154 if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening 155 return; // the compaction also doesn't start when interrupted 156 } 157 158 if (nextStep == MemStoreCompactionStrategy.Action.NOOP) { 159 return; 160 } 161 if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN 162 || nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) { 163 // some Segment in the pipeline is with SkipList index, make it flat 164 compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep); 165 return; 166 } 167 168 // Create one segment representing all segments in the compaction pipeline, 169 // either by compaction or by merge 170 if (!isInterrupted.get()) { 171 result = createSubstitution(nextStep); 172 } 173 174 // Substitute the pipeline with one segment 175 if (!isInterrupted.get()) { 176 resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge); 177 if (resultSwapped) { 178 // update compaction strategy 179 strategy.updateStats(result); 180 // update the wal so it can be truncated and not get too long 181 compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater 182 } 183 } 184 } catch (IOException e) { 185 LOG.trace("Interrupting in-memory compaction for store={}", 186 compactingMemStore.getFamilyName()); 187 Thread.currentThread().interrupt(); 188 } finally { 189 // For the MERGE case, if the result was created, but swap didn't happen, 190 // we DON'T need to close the result segment (meaning its MSLAB)! 191 // Because closing the result segment means closing the chunks of all segments 192 // in the compaction pipeline, which still have ongoing scans. 193 if (!merge && (result != null) && !resultSwapped) { 194 result.close(); 195 } 196 releaseResources(); 197 compactingMemStore.setInMemoryCompactionCompleted(); 198 } 199 200 } 201 202 /**---------------------------------------------------------------------- 203 * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the 204 * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned. 205 */ 206 private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) throws 207 IOException { 208 209 ImmutableSegment result = null; 210 MemStoreSegmentsIterator iterator = null; 211 List<ImmutableSegment> segments = versionedList.getStoreSegments(); 212 for (ImmutableSegment s : segments) { 213 s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed 214 } 215 216 switch (action) { 217 case COMPACT: 218 iterator = new MemStoreCompactorSegmentsIterator(segments, 219 compactingMemStore.getComparator(), 220 compactionKVMax, compactingMemStore.getStore()); 221 222 result = SegmentFactory.instance().createImmutableSegmentByCompaction( 223 compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, 224 versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action); 225 iterator.close(); 226 break; 227 case MERGE: 228 case MERGE_COUNT_UNIQUE_KEYS: 229 iterator = 230 new MemStoreMergerSegmentsIterator(segments, 231 compactingMemStore.getComparator(), compactionKVMax); 232 233 result = SegmentFactory.instance().createImmutableSegmentByMerge( 234 compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, 235 versionedList.getNumOfCells(), segments, compactingMemStore.getIndexType(), action); 236 iterator.close(); 237 break; 238 default: 239 throw new RuntimeException("Unknown action " + action); // sanity check 240 } 241 242 return result; 243 } 244 245 @VisibleForTesting 246 void initiateCompactionStrategy(MemoryCompactionPolicy compType, 247 Configuration configuration, String cfName) throws IllegalArgumentIOException { 248 249 assert (compType !=MemoryCompactionPolicy.NONE); 250 251 switch (compType){ 252 case BASIC: strategy = new BasicMemStoreCompactionStrategy(configuration, cfName); 253 break; 254 case EAGER: strategy = new EagerMemStoreCompactionStrategy(configuration, cfName); 255 break; 256 case ADAPTIVE: strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName); 257 break; 258 default: 259 // sanity check 260 throw new IllegalArgumentIOException("Unknown memory compaction type " + compType); 261 } 262 } 263}