001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.MemoryCompactionPolicy;
028import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.util.ClassSize;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * The ongoing MemStore Compaction manager, dispatches a solo running compaction and interrupts
037 * the compaction if requested. The compaction is interrupted and stopped by CompactingMemStore,
038 * for example when another compaction needs to be started.
039 * Prior to compaction the MemStoreCompactor evaluates
040 * the compacting ratio and aborts the compaction if it is not worthy.
041 * The MemStoreScanner is used to traverse the compaction pipeline. The MemStoreScanner
042 * is included in internal store scanner, where all compaction logic is implemented.
043 * Threads safety: It is assumed that the compaction pipeline is immutable,
044 * therefore no special synchronization is required.
045 */
046@InterfaceAudience.Private
047public class MemStoreCompactor {
048
049  public static final long DEEP_OVERHEAD = ClassSize
050      .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE
051          // compactingMemStore, versionedList, isInterrupted, strategy (the reference)
052          // "action" is an enum and thus it is a class with static final constants,
053          // so counting only the size of the reference to it and not the size of the internals
054          + Bytes.SIZEOF_INT        // compactionKVMax
055          + ClassSize.ATOMIC_BOOLEAN    // isInterrupted (the internals)
056      );
057
058  private static final Logger LOG = LoggerFactory.getLogger(MemStoreCompactor.class);
059  private CompactingMemStore compactingMemStore;
060
061  // a static version of the segment list from the pipeline
062  private VersionedSegmentsList versionedList;
063
064  // a flag raised when compaction is requested to stop
065  private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
066
067  // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator
068  private final int compactionKVMax;
069
070  private MemStoreCompactionStrategy strategy;
071
072  public MemStoreCompactor(CompactingMemStore compactingMemStore,
073      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
074    this.compactingMemStore = compactingMemStore;
075    this.compactionKVMax = compactingMemStore.getConfiguration()
076        .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
077    initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(),
078        compactingMemStore.getFamilyName());
079  }
080
081  @Override
082  public String toString() {
083    return this.strategy + ", compactionCellMax=" + this.compactionKVMax;
084  }
085
086  /**----------------------------------------------------------------------
087   * The request to dispatch the compaction asynchronous task.
088   * The method returns true if compaction was successfully dispatched, or false if there
089   * is already an ongoing compaction or no segments to compact.
090   */
091  public boolean start() throws IOException {
092    if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline
093      return false;
094    }
095
096    // get a snapshot of the list of the segments from the pipeline,
097    // this local copy of the list is marked with specific version
098    versionedList = compactingMemStore.getImmutableSegments();
099    LOG.trace("Speculative compaction starting on {}/{}",
100        compactingMemStore.getStore().getHRegion().getRegionInfo().getEncodedName(),
101        compactingMemStore.getStore().getColumnFamilyName());
102    HStore store = compactingMemStore.getStore();
103    RegionCoprocessorHost cpHost = store.getCoprocessorHost();
104    if (cpHost != null) {
105      cpHost.preMemStoreCompaction(store);
106    }
107    try {
108      doCompaction();
109    } finally {
110      if (cpHost != null) {
111        cpHost.postMemStoreCompaction(store);
112      }
113    }
114    return true;
115  }
116
117  /**----------------------------------------------------------------------
118  * The request to cancel the compaction asynchronous task
119  * The compaction may still happen if the request was sent too late
120  * Non-blocking request
121  */
122  public void stop() {
123      isInterrupted.compareAndSet(false, true);
124  }
125
126
127  public void resetStats() {
128    strategy.resetStats();
129  }
130
131  /**----------------------------------------------------------------------
132  * Reset the interruption indicator and clear the pointers in order to allow good
133  * garbage collection
134  */
135  private void releaseResources() {
136    isInterrupted.set(false);
137    versionedList = null;
138  }
139
140  /**----------------------------------------------------------------------
141  * The worker thread performs the compaction asynchronously.
142  * The solo (per compactor) thread only reads the compaction pipeline.
143  * There is at most one thread per memstore instance.
144  */
145  private void doCompaction() {
146    ImmutableSegment result = null;
147    boolean resultSwapped = false;
148    MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList);
149    boolean merge = (nextStep == MemStoreCompactionStrategy.Action.MERGE ||
150        nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
151    try {
152      if (isInterrupted.get()) {      // if the entire process is interrupted cancel flattening
153        return;           // the compaction also doesn't start when interrupted
154      }
155
156      if (nextStep == MemStoreCompactionStrategy.Action.NOOP) {
157        return;
158      }
159      if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN
160          || nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
161        // some Segment in the pipeline is with SkipList index, make it flat
162        compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep);
163        return;
164      }
165
166      // Create one segment representing all segments in the compaction pipeline,
167      // either by compaction or by merge
168      if (!isInterrupted.get()) {
169        result = createSubstitution(nextStep);
170      }
171
172      // Substitute the pipeline with one segment
173      if (!isInterrupted.get()) {
174        resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge);
175        if (resultSwapped) {
176          // update compaction strategy
177          strategy.updateStats(result);
178          // update the wal so it can be truncated and not get too long
179          compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
180        }
181      }
182    } catch (IOException e) {
183      LOG.trace("Interrupting in-memory compaction for store={}",
184          compactingMemStore.getFamilyName());
185      Thread.currentThread().interrupt();
186    } finally {
187      // For the MERGE case, if the result was created, but swap didn't happen,
188      // we DON'T need to close the result segment (meaning its MSLAB)!
189      // Because closing the result segment means closing the chunks of all segments
190      // in the compaction pipeline, which still have ongoing scans.
191      if (!merge && (result != null) && !resultSwapped) {
192        result.close();
193      }
194      releaseResources();
195      compactingMemStore.setInMemoryCompactionCompleted();
196    }
197
198  }
199
200  /**----------------------------------------------------------------------
201   * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the
202   * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned.
203   */
204  private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) throws
205      IOException {
206
207    ImmutableSegment result = null;
208    MemStoreSegmentsIterator iterator = null;
209    List<ImmutableSegment> segments = versionedList.getStoreSegments();
210    for (ImmutableSegment s : segments) {
211      s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
212    }
213
214    switch (action) {
215      case COMPACT:
216        iterator = new MemStoreCompactorSegmentsIterator(segments,
217            compactingMemStore.getComparator(),
218            compactionKVMax, compactingMemStore.getStore());
219
220        result = SegmentFactory.instance().createImmutableSegmentByCompaction(
221          compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
222          versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action);
223        iterator.close();
224        break;
225      case MERGE:
226      case MERGE_COUNT_UNIQUE_KEYS:
227        iterator =
228            new MemStoreMergerSegmentsIterator(segments,
229                compactingMemStore.getComparator(), compactionKVMax);
230
231        result = SegmentFactory.instance().createImmutableSegmentByMerge(
232          compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
233          versionedList.getNumOfCells(), segments, compactingMemStore.getIndexType(), action);
234        iterator.close();
235        break;
236      default:
237        throw new RuntimeException("Unknown action " + action); // sanity check
238    }
239
240    return result;
241  }
242
243  void initiateCompactionStrategy(MemoryCompactionPolicy compType,
244      Configuration configuration, String cfName) throws IllegalArgumentIOException {
245
246    assert (compType !=MemoryCompactionPolicy.NONE);
247
248    switch (compType){
249      case BASIC: strategy = new BasicMemStoreCompactionStrategy(configuration, cfName);
250        break;
251      case EAGER: strategy = new EagerMemStoreCompactionStrategy(configuration, cfName);
252        break;
253      case ADAPTIVE: strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName);
254        break;
255      default:
256        // sanity check
257        throw new IllegalArgumentIOException("Unknown memory compaction type " + compType);
258    }
259  }
260}