001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.concurrent.atomic.AtomicBoolean;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.MemoryCompactionPolicy;
026import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
027import org.apache.hadoop.hbase.util.Bytes;
028import org.apache.hadoop.hbase.util.ClassSize;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * The ongoing MemStore Compaction manager, dispatches a solo running compaction and interrupts the
035 * compaction if requested. The compaction is interrupted and stopped by CompactingMemStore, for
036 * example when another compaction needs to be started. Prior to compaction the MemStoreCompactor
037 * evaluates the compacting ratio and aborts the compaction if it is not worthy. The MemStoreScanner
038 * is used to traverse the compaction pipeline. The MemStoreScanner is included in internal store
039 * scanner, where all compaction logic is implemented. Threads safety: It is assumed that the
040 * compaction pipeline is immutable, therefore no special synchronization is required.
041 */
042@InterfaceAudience.Private
043public class MemStoreCompactor {
044
045  public static final long DEEP_OVERHEAD =
046    ClassSize.align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE
047    // compactingMemStore, versionedList, isInterrupted, strategy (the reference)
048    // "action" is an enum and thus it is a class with static final constants,
049    // so counting only the size of the reference to it and not the size of the internals
050      + Bytes.SIZEOF_INT // compactionKVMax
051      + ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals)
052    );
053
054  private static final Logger LOG = LoggerFactory.getLogger(MemStoreCompactor.class);
055  private CompactingMemStore compactingMemStore;
056
057  // a static version of the segment list from the pipeline
058  private VersionedSegmentsList versionedList;
059
060  // a flag raised when compaction is requested to stop
061  private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
062
063  // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator
064  private final int compactionKVMax;
065
066  private MemStoreCompactionStrategy strategy;
067
068  public MemStoreCompactor(CompactingMemStore compactingMemStore,
069    MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
070    this.compactingMemStore = compactingMemStore;
071    this.compactionKVMax = compactingMemStore.getConfiguration()
072      .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
073    initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(),
074      compactingMemStore.getFamilyName());
075  }
076
077  @Override
078  public String toString() {
079    return this.strategy + ", compactionCellMax=" + this.compactionKVMax;
080  }
081
082  /**
083   * ---------------------------------------------------------------------- The request to dispatch
084   * the compaction asynchronous task. The method returns true if compaction was successfully
085   * dispatched, or false if there is already an ongoing compaction or no segments to compact.
086   */
087  public boolean start() throws IOException {
088    if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline
089      return false;
090    }
091
092    // get a snapshot of the list of the segments from the pipeline,
093    // this local copy of the list is marked with specific version
094    versionedList = compactingMemStore.getImmutableSegments();
095    LOG.trace("Speculative compaction starting on {}/{}",
096      compactingMemStore.getStore().getHRegion().getRegionInfo().getEncodedName(),
097      compactingMemStore.getStore().getColumnFamilyName());
098    HStore store = compactingMemStore.getStore();
099    RegionCoprocessorHost cpHost = store.getCoprocessorHost();
100    if (cpHost != null) {
101      cpHost.preMemStoreCompaction(store);
102    }
103    try {
104      doCompaction();
105    } finally {
106      if (cpHost != null) {
107        cpHost.postMemStoreCompaction(store);
108      }
109    }
110    return true;
111  }
112
113  /**
114   * ---------------------------------------------------------------------- The request to cancel
115   * the compaction asynchronous task The compaction may still happen if the request was sent too
116   * late Non-blocking request
117   */
118  public void stop() {
119    isInterrupted.compareAndSet(false, true);
120  }
121
122  public void resetStats() {
123    strategy.resetStats();
124  }
125
126  /**
127   * ---------------------------------------------------------------------- Reset the interruption
128   * indicator and clear the pointers in order to allow good garbage collection
129   */
130  private void releaseResources() {
131    isInterrupted.set(false);
132    versionedList = null;
133  }
134
135  /**
136   * ---------------------------------------------------------------------- The worker thread
137   * performs the compaction asynchronously. The solo (per compactor) thread only reads the
138   * compaction pipeline. There is at most one thread per memstore instance.
139   */
140  private void doCompaction() {
141    ImmutableSegment result = null;
142    boolean resultSwapped = false;
143    MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList);
144    boolean merge = (nextStep == MemStoreCompactionStrategy.Action.MERGE
145      || nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
146    try {
147      if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening
148        return; // the compaction also doesn't start when interrupted
149      }
150
151      if (nextStep == MemStoreCompactionStrategy.Action.NOOP) {
152        return;
153      }
154      if (
155        nextStep == MemStoreCompactionStrategy.Action.FLATTEN
156          || nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS
157      ) {
158        // some Segment in the pipeline is with SkipList index, make it flat
159        compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep);
160        return;
161      }
162
163      // Create one segment representing all segments in the compaction pipeline,
164      // either by compaction or by merge
165      if (!isInterrupted.get()) {
166        result = createSubstitution(nextStep);
167      }
168
169      // Substitute the pipeline with one segment
170      if (!isInterrupted.get()) {
171        resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge);
172        if (resultSwapped) {
173          // update compaction strategy
174          strategy.updateStats(result);
175          // update the wal so it can be truncated and not get too long
176          compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
177        }
178      }
179    } catch (IOException e) {
180      LOG.trace("Interrupting in-memory compaction for store={}",
181        compactingMemStore.getFamilyName());
182      Thread.currentThread().interrupt();
183    } finally {
184      // For the MERGE case, if the result was created, but swap didn't happen,
185      // we DON'T need to close the result segment (meaning its MSLAB)!
186      // Because closing the result segment means closing the chunks of all segments
187      // in the compaction pipeline, which still have ongoing scans.
188      if (!merge && (result != null) && !resultSwapped) {
189        result.close();
190      }
191      releaseResources();
192      compactingMemStore.setInMemoryCompactionCompleted();
193    }
194
195  }
196
197  /**
198   * ---------------------------------------------------------------------- Creation of the
199   * ImmutableSegment either by merge or copy-compact of the segments of the pipeline, based on the
200   * Compactor Iterator. The new ImmutableSegment is returned.
201   */
202  private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action)
203    throws IOException {
204
205    ImmutableSegment result = null;
206    MemStoreSegmentsIterator iterator = null;
207    List<ImmutableSegment> segments = versionedList.getStoreSegments();
208    for (ImmutableSegment s : segments) {
209      s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed.
210      // we skip empty segment when create MemStoreSegmentsIterator following.
211    }
212
213    switch (action) {
214      case COMPACT:
215        iterator = new MemStoreCompactorSegmentsIterator(segments,
216          compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore());
217
218        result = SegmentFactory.instance().createImmutableSegmentByCompaction(
219          compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
220          versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action);
221        iterator.close();
222        break;
223      case MERGE:
224      case MERGE_COUNT_UNIQUE_KEYS:
225        iterator = new MemStoreMergerSegmentsIterator(segments, compactingMemStore.getComparator(),
226          compactionKVMax);
227
228        result = SegmentFactory.instance().createImmutableSegmentByMerge(
229          compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
230          versionedList.getNumOfCells(), segments, compactingMemStore.getIndexType(), action);
231        iterator.close();
232        break;
233      default:
234        throw new RuntimeException("Unknown action " + action); // sanity check
235    }
236
237    return result;
238  }
239
240  void initiateCompactionStrategy(MemoryCompactionPolicy compType, Configuration configuration,
241    String cfName) throws IllegalArgumentIOException {
242
243    assert (compType != MemoryCompactionPolicy.NONE);
244
245    switch (compType) {
246      case BASIC:
247        strategy = new BasicMemStoreCompactionStrategy(configuration, cfName);
248        break;
249      case EAGER:
250        strategy = new EagerMemStoreCompactionStrategy(configuration, cfName);
251        break;
252      case ADAPTIVE:
253        strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName);
254        break;
255      default:
256        // sanity check
257        throw new IllegalArgumentIOException("Unknown memory compaction type " + compType);
258    }
259  }
260}