001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.MemoryCompactionPolicy;
027import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.util.ClassSize;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
035
036/**
037 * The ongoing MemStore Compaction manager, dispatches a solo running compaction and interrupts
038 * the compaction if requested. The compaction is interrupted and stopped by CompactingMemStore,
039 * for example when another compaction needs to be started.
040 * Prior to compaction the MemStoreCompactor evaluates
041 * the compacting ratio and aborts the compaction if it is not worthy.
042 * The MemStoreScanner is used to traverse the compaction pipeline. The MemStoreScanner
043 * is included in internal store scanner, where all compaction logic is implemented.
044 * Threads safety: It is assumed that the compaction pipeline is immutable,
045 * therefore no special synchronization is required.
046 */
047@InterfaceAudience.Private
048public class MemStoreCompactor {
049
050  public static final long DEEP_OVERHEAD = ClassSize
051      .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE
052          // compactingMemStore, versionedList, isInterrupted, strategy (the reference)
053          // "action" is an enum and thus it is a class with static final constants,
054          // so counting only the size of the reference to it and not the size of the internals
055          + Bytes.SIZEOF_INT        // compactionKVMax
056          + ClassSize.ATOMIC_BOOLEAN    // isInterrupted (the internals)
057      );
058
059  private static final Logger LOG = LoggerFactory.getLogger(MemStoreCompactor.class);
060  private CompactingMemStore compactingMemStore;
061
062  // a static version of the segment list from the pipeline
063  private VersionedSegmentsList versionedList;
064
065  // a flag raised when compaction is requested to stop
066  private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
067
068  // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator
069  private final int compactionKVMax;
070
071  private MemStoreCompactionStrategy strategy;
072
073  public MemStoreCompactor(CompactingMemStore compactingMemStore,
074      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
075    this.compactingMemStore = compactingMemStore;
076    this.compactionKVMax = compactingMemStore.getConfiguration()
077        .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
078    initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(),
079        compactingMemStore.getFamilyName());
080  }
081
082  @Override
083  public String toString() {
084    return this.strategy + ", compactionCellMax=" + this.compactionKVMax;
085  }
086
087  /**----------------------------------------------------------------------
088   * The request to dispatch the compaction asynchronous task.
089   * The method returns true if compaction was successfully dispatched, or false if there
090   * is already an ongoing compaction or no segments to compact.
091   */
092  public boolean start() throws IOException {
093    if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline
094      return false;
095    }
096
097    // get a snapshot of the list of the segments from the pipeline,
098    // this local copy of the list is marked with specific version
099    versionedList = compactingMemStore.getImmutableSegments();
100    LOG.trace("Speculative compaction starting on {}/{}",
101        compactingMemStore.getStore().getHRegion().getRegionInfo().getEncodedName(),
102        compactingMemStore.getStore().getColumnFamilyName());
103    HStore store = compactingMemStore.getStore();
104    RegionCoprocessorHost cpHost = store.getCoprocessorHost();
105    if (cpHost != null) {
106      cpHost.preMemStoreCompaction(store);
107    }
108    try {
109      doCompaction();
110    } finally {
111      if (cpHost != null) {
112        cpHost.postMemStoreCompaction(store);
113      }
114    }
115    return true;
116  }
117
118  /**----------------------------------------------------------------------
119  * The request to cancel the compaction asynchronous task
120  * The compaction may still happen if the request was sent too late
121  * Non-blocking request
122  */
123  public void stop() {
124      isInterrupted.compareAndSet(false, true);
125  }
126
127
128  public void resetStats() {
129    strategy.resetStats();
130  }
131
132  /**----------------------------------------------------------------------
133  * Reset the interruption indicator and clear the pointers in order to allow good
134  * garbage collection
135  */
136  private void releaseResources() {
137    isInterrupted.set(false);
138    versionedList = null;
139  }
140
141  /**----------------------------------------------------------------------
142  * The worker thread performs the compaction asynchronously.
143  * The solo (per compactor) thread only reads the compaction pipeline.
144  * There is at most one thread per memstore instance.
145  */
146  private void doCompaction() {
147    ImmutableSegment result = null;
148    boolean resultSwapped = false;
149    if (isInterrupted.get()) {      // if the entire process is interrupted cancel flattening
150      return;           // the compaction also doesn't start when interrupted
151    }
152
153    MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList);
154    boolean merge =
155        (nextStep == MemStoreCompactionStrategy.Action.MERGE ||
156            nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
157    try {
158      if (nextStep == MemStoreCompactionStrategy.Action.NOOP) {
159        return;
160      }
161      if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN ||
162          nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
163        // some Segment in the pipeline is with SkipList index, make it flat
164        compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep);
165        return;
166      }
167
168      // Create one segment representing all segments in the compaction pipeline,
169      // either by compaction or by merge
170      if (!isInterrupted.get()) {
171        result = createSubstitution(nextStep);
172      }
173
174      // Substitute the pipeline with one segment
175      if (!isInterrupted.get()) {
176        resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge);
177        if (resultSwapped) {
178          // update compaction strategy
179          strategy.updateStats(result);
180          // update the wal so it can be truncated and not get too long
181          compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
182        }
183      }
184    } catch (IOException e) {
185      LOG.trace("Interrupting in-memory compaction for store={}",
186          compactingMemStore.getFamilyName());
187      Thread.currentThread().interrupt();
188    } finally {
189      // For the MERGE case, if the result was created, but swap didn't happen,
190      // we DON'T need to close the result segment (meaning its MSLAB)!
191      // Because closing the result segment means closing the chunks of all segments
192      // in the compaction pipeline, which still have ongoing scans.
193      if (!merge && (result != null) && !resultSwapped) {
194        result.close();
195      }
196      releaseResources();
197    }
198
199  }
200
201  /**----------------------------------------------------------------------
202   * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the
203   * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned.
204   */
205  private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) throws
206      IOException {
207
208    ImmutableSegment result = null;
209    MemStoreSegmentsIterator iterator = null;
210
211    switch (action) {
212      case COMPACT:
213        iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(),
214            compactingMemStore.getComparator(),
215            compactionKVMax, compactingMemStore.getStore());
216
217        result = SegmentFactory.instance().createImmutableSegmentByCompaction(
218          compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
219          versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action);
220        iterator.close();
221        break;
222      case MERGE:
223      case MERGE_COUNT_UNIQUE_KEYS:
224        iterator =
225            new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
226                compactingMemStore.getComparator(), compactionKVMax);
227
228        result = SegmentFactory.instance().createImmutableSegmentByMerge(
229          compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
230          versionedList.getNumOfCells(), versionedList.getStoreSegments(),
231          compactingMemStore.getIndexType(), action);
232        iterator.close();
233        break;
234      default:
235        throw new RuntimeException("Unknown action " + action); // sanity check
236    }
237
238    return result;
239  }
240
241  @VisibleForTesting
242  void initiateCompactionStrategy(MemoryCompactionPolicy compType,
243      Configuration configuration, String cfName) throws IllegalArgumentIOException {
244
245    assert (compType !=MemoryCompactionPolicy.NONE);
246
247    switch (compType){
248      case BASIC: strategy = new BasicMemStoreCompactionStrategy(configuration, cfName);
249        break;
250      case EAGER: strategy = new EagerMemStoreCompactionStrategy(configuration, cfName);
251        break;
252      case ADAPTIVE: strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName);
253        break;
254      default:
255        // sanity check
256        throw new IllegalArgumentIOException("Unknown memory compaction type " + compType);
257    }
258  }
259}