001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.concurrent.atomic.AtomicBoolean; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.MemoryCompactionPolicy; 026import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 027import org.apache.hadoop.hbase.util.Bytes; 028import org.apache.hadoop.hbase.util.ClassSize; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * The ongoing MemStore Compaction manager, dispatches a solo running compaction and interrupts the 035 * compaction if requested. The compaction is interrupted and stopped by CompactingMemStore, for 036 * example when another compaction needs to be started. Prior to compaction the MemStoreCompactor 037 * evaluates the compacting ratio and aborts the compaction if it is not worthy. The MemStoreScanner 038 * is used to traverse the compaction pipeline. The MemStoreScanner is included in internal store 039 * scanner, where all compaction logic is implemented. Threads safety: It is assumed that the 040 * compaction pipeline is immutable, therefore no special synchronization is required. 041 */ 042@InterfaceAudience.Private 043public class MemStoreCompactor { 044 045 public static final long DEEP_OVERHEAD = 046 ClassSize.align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE 047 // compactingMemStore, versionedList, isInterrupted, strategy (the reference) 048 // "action" is an enum and thus it is a class with static final constants, 049 // so counting only the size of the reference to it and not the size of the internals 050 + Bytes.SIZEOF_INT // compactionKVMax 051 + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals) 052 ); 053 054 private static final Logger LOG = LoggerFactory.getLogger(MemStoreCompactor.class); 055 private CompactingMemStore compactingMemStore; 056 057 // a static version of the segment list from the pipeline 058 private VersionedSegmentsList versionedList; 059 060 // a flag raised when compaction is requested to stop 061 private final AtomicBoolean isInterrupted = new AtomicBoolean(false); 062 063 // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator 064 private final int compactionKVMax; 065 066 private MemStoreCompactionStrategy strategy; 067 068 public MemStoreCompactor(CompactingMemStore compactingMemStore, 069 MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException { 070 this.compactingMemStore = compactingMemStore; 071 this.compactionKVMax = compactingMemStore.getConfiguration() 072 .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 073 initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(), 074 compactingMemStore.getFamilyName()); 075 } 076 077 @Override 078 public String toString() { 079 return this.strategy + ", compactionCellMax=" + this.compactionKVMax; 080 } 081 082 /** 083 * ---------------------------------------------------------------------- The request to dispatch 084 * the compaction asynchronous task. The method returns true if compaction was successfully 085 * dispatched, or false if there is already an ongoing compaction or no segments to compact. 086 */ 087 public boolean start() throws IOException { 088 if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline 089 return false; 090 } 091 092 // get a snapshot of the list of the segments from the pipeline, 093 // this local copy of the list is marked with specific version 094 versionedList = compactingMemStore.getImmutableSegments(); 095 LOG.trace("Speculative compaction starting on {}/{}", 096 compactingMemStore.getStore().getHRegion().getRegionInfo().getEncodedName(), 097 compactingMemStore.getStore().getColumnFamilyName()); 098 HStore store = compactingMemStore.getStore(); 099 RegionCoprocessorHost cpHost = store.getCoprocessorHost(); 100 if (cpHost != null) { 101 cpHost.preMemStoreCompaction(store); 102 } 103 try { 104 doCompaction(); 105 } finally { 106 if (cpHost != null) { 107 cpHost.postMemStoreCompaction(store); 108 } 109 } 110 return true; 111 } 112 113 /** 114 * ---------------------------------------------------------------------- The request to cancel 115 * the compaction asynchronous task The compaction may still happen if the request was sent too 116 * late Non-blocking request 117 */ 118 public void stop() { 119 isInterrupted.compareAndSet(false, true); 120 } 121 122 public void resetStats() { 123 strategy.resetStats(); 124 } 125 126 /** 127 * ---------------------------------------------------------------------- Reset the interruption 128 * indicator and clear the pointers in order to allow good garbage collection 129 */ 130 private void releaseResources() { 131 isInterrupted.set(false); 132 versionedList = null; 133 } 134 135 /** 136 * ---------------------------------------------------------------------- The worker thread 137 * performs the compaction asynchronously. The solo (per compactor) thread only reads the 138 * compaction pipeline. There is at most one thread per memstore instance. 139 */ 140 private void doCompaction() { 141 ImmutableSegment result = null; 142 boolean resultSwapped = false; 143 MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList); 144 boolean merge = (nextStep == MemStoreCompactionStrategy.Action.MERGE 145 || nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS); 146 try { 147 if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening 148 return; // the compaction also doesn't start when interrupted 149 } 150 151 if (nextStep == MemStoreCompactionStrategy.Action.NOOP) { 152 return; 153 } 154 if ( 155 nextStep == MemStoreCompactionStrategy.Action.FLATTEN 156 || nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS 157 ) { 158 // some Segment in the pipeline is with SkipList index, make it flat 159 compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep); 160 return; 161 } 162 163 // Create one segment representing all segments in the compaction pipeline, 164 // either by compaction or by merge 165 if (!isInterrupted.get()) { 166 result = createSubstitution(nextStep); 167 } 168 169 // Substitute the pipeline with one segment 170 if (!isInterrupted.get()) { 171 resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge); 172 if (resultSwapped) { 173 // update compaction strategy 174 strategy.updateStats(result); 175 // update the wal so it can be truncated and not get too long 176 compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater 177 } 178 } 179 } catch (IOException e) { 180 LOG.trace("Interrupting in-memory compaction for store={}", 181 compactingMemStore.getFamilyName()); 182 Thread.currentThread().interrupt(); 183 } finally { 184 // For the MERGE case, if the result was created, but swap didn't happen, 185 // we DON'T need to close the result segment (meaning its MSLAB)! 186 // Because closing the result segment means closing the chunks of all segments 187 // in the compaction pipeline, which still have ongoing scans. 188 if (!merge && (result != null) && !resultSwapped) { 189 result.close(); 190 } 191 releaseResources(); 192 compactingMemStore.setInMemoryCompactionCompleted(); 193 } 194 195 } 196 197 /** 198 * ---------------------------------------------------------------------- Creation of the 199 * ImmutableSegment either by merge or copy-compact of the segments of the pipeline, based on the 200 * Compactor Iterator. The new ImmutableSegment is returned. 201 */ 202 private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) 203 throws IOException { 204 205 ImmutableSegment result = null; 206 MemStoreSegmentsIterator iterator = null; 207 List<ImmutableSegment> segments = versionedList.getStoreSegments(); 208 for (ImmutableSegment s : segments) { 209 s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed. 210 // we skip empty segment when create MemStoreSegmentsIterator following. 211 } 212 213 switch (action) { 214 case COMPACT: 215 iterator = new MemStoreCompactorSegmentsIterator(segments, 216 compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore()); 217 218 result = SegmentFactory.instance().createImmutableSegmentByCompaction( 219 compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, 220 versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action); 221 iterator.close(); 222 break; 223 case MERGE: 224 case MERGE_COUNT_UNIQUE_KEYS: 225 iterator = new MemStoreMergerSegmentsIterator(segments, compactingMemStore.getComparator(), 226 compactionKVMax); 227 228 result = SegmentFactory.instance().createImmutableSegmentByMerge( 229 compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, 230 versionedList.getNumOfCells(), segments, compactingMemStore.getIndexType(), action); 231 iterator.close(); 232 break; 233 default: 234 throw new RuntimeException("Unknown action " + action); // sanity check 235 } 236 237 return result; 238 } 239 240 void initiateCompactionStrategy(MemoryCompactionPolicy compType, Configuration configuration, 241 String cfName) throws IllegalArgumentIOException { 242 243 assert (compType != MemoryCompactionPolicy.NONE); 244 245 switch (compType) { 246 case BASIC: 247 strategy = new BasicMemStoreCompactionStrategy(configuration, cfName); 248 break; 249 case EAGER: 250 strategy = new EagerMemStoreCompactionStrategy(configuration, cfName); 251 break; 252 case ADAPTIVE: 253 strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName); 254 break; 255 default: 256 // sanity check 257 throw new IllegalArgumentIOException("Unknown memory compaction type " + compType); 258 } 259 } 260}