001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.MemoryCompactionPolicy;
028import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.hbase.util.ClassSize;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
036
037/**
038 * The ongoing MemStore Compaction manager, dispatches a solo running compaction and interrupts
039 * the compaction if requested. The compaction is interrupted and stopped by CompactingMemStore,
040 * for example when another compaction needs to be started.
041 * Prior to compaction the MemStoreCompactor evaluates
042 * the compacting ratio and aborts the compaction if it is not worthy.
043 * The MemStoreScanner is used to traverse the compaction pipeline. The MemStoreScanner
044 * is included in internal store scanner, where all compaction logic is implemented.
045 * Threads safety: It is assumed that the compaction pipeline is immutable,
046 * therefore no special synchronization is required.
047 */
048@InterfaceAudience.Private
049public class MemStoreCompactor {
050
051  public static final long DEEP_OVERHEAD = ClassSize
052      .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE
053          // compactingMemStore, versionedList, isInterrupted, strategy (the reference)
054          // "action" is an enum and thus it is a class with static final constants,
055          // so counting only the size of the reference to it and not the size of the internals
056          + Bytes.SIZEOF_INT        // compactionKVMax
057          + ClassSize.ATOMIC_BOOLEAN    // isInterrupted (the internals)
058      );
059
060  private static final Logger LOG = LoggerFactory.getLogger(MemStoreCompactor.class);
061  private CompactingMemStore compactingMemStore;
062
063  // a static version of the segment list from the pipeline
064  private VersionedSegmentsList versionedList;
065
066  // a flag raised when compaction is requested to stop
067  private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
068
069  // the limit to the size of the groups to be later provided to MemStoreSegmentsIterator
070  private final int compactionKVMax;
071
072  private MemStoreCompactionStrategy strategy;
073
074  public MemStoreCompactor(CompactingMemStore compactingMemStore,
075      MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
076    this.compactingMemStore = compactingMemStore;
077    this.compactionKVMax = compactingMemStore.getConfiguration()
078        .getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
079    initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(),
080        compactingMemStore.getFamilyName());
081  }
082
083  @Override
084  public String toString() {
085    return this.strategy + ", compactionCellMax=" + this.compactionKVMax;
086  }
087
088  /**----------------------------------------------------------------------
089   * The request to dispatch the compaction asynchronous task.
090   * The method returns true if compaction was successfully dispatched, or false if there
091   * is already an ongoing compaction or no segments to compact.
092   */
093  public boolean start() throws IOException {
094    if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline
095      return false;
096    }
097
098    // get a snapshot of the list of the segments from the pipeline,
099    // this local copy of the list is marked with specific version
100    versionedList = compactingMemStore.getImmutableSegments();
101    LOG.trace("Speculative compaction starting on {}/{}",
102        compactingMemStore.getStore().getHRegion().getRegionInfo().getEncodedName(),
103        compactingMemStore.getStore().getColumnFamilyName());
104    HStore store = compactingMemStore.getStore();
105    RegionCoprocessorHost cpHost = store.getCoprocessorHost();
106    if (cpHost != null) {
107      cpHost.preMemStoreCompaction(store);
108    }
109    try {
110      doCompaction();
111    } finally {
112      if (cpHost != null) {
113        cpHost.postMemStoreCompaction(store);
114      }
115    }
116    return true;
117  }
118
119  /**----------------------------------------------------------------------
120  * The request to cancel the compaction asynchronous task
121  * The compaction may still happen if the request was sent too late
122  * Non-blocking request
123  */
124  public void stop() {
125      isInterrupted.compareAndSet(false, true);
126  }
127
128
129  public void resetStats() {
130    strategy.resetStats();
131  }
132
133  /**----------------------------------------------------------------------
134  * Reset the interruption indicator and clear the pointers in order to allow good
135  * garbage collection
136  */
137  private void releaseResources() {
138    isInterrupted.set(false);
139    versionedList = null;
140  }
141
142  /**----------------------------------------------------------------------
143  * The worker thread performs the compaction asynchronously.
144  * The solo (per compactor) thread only reads the compaction pipeline.
145  * There is at most one thread per memstore instance.
146  */
147  private void doCompaction() {
148    ImmutableSegment result = null;
149    boolean resultSwapped = false;
150    MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList);
151    boolean merge = (nextStep == MemStoreCompactionStrategy.Action.MERGE ||
152        nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
153    try {
154      if (isInterrupted.get()) {      // if the entire process is interrupted cancel flattening
155        return;           // the compaction also doesn't start when interrupted
156      }
157
158      if (nextStep == MemStoreCompactionStrategy.Action.NOOP) {
159        return;
160      }
161      if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN
162          || nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
163        // some Segment in the pipeline is with SkipList index, make it flat
164        compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep);
165        return;
166      }
167
168      // Create one segment representing all segments in the compaction pipeline,
169      // either by compaction or by merge
170      if (!isInterrupted.get()) {
171        result = createSubstitution(nextStep);
172      }
173
174      // Substitute the pipeline with one segment
175      if (!isInterrupted.get()) {
176        resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge);
177        if (resultSwapped) {
178          // update compaction strategy
179          strategy.updateStats(result);
180          // update the wal so it can be truncated and not get too long
181          compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
182        }
183      }
184    } catch (IOException e) {
185      LOG.trace("Interrupting in-memory compaction for store={}",
186          compactingMemStore.getFamilyName());
187      Thread.currentThread().interrupt();
188    } finally {
189      // For the MERGE case, if the result was created, but swap didn't happen,
190      // we DON'T need to close the result segment (meaning its MSLAB)!
191      // Because closing the result segment means closing the chunks of all segments
192      // in the compaction pipeline, which still have ongoing scans.
193      if (!merge && (result != null) && !resultSwapped) {
194        result.close();
195      }
196      releaseResources();
197      compactingMemStore.setInMemoryCompactionCompleted();
198    }
199
200  }
201
202  /**----------------------------------------------------------------------
203   * Creation of the ImmutableSegment either by merge or copy-compact of the segments of the
204   * pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned.
205   */
206  private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) throws
207      IOException {
208
209    ImmutableSegment result = null;
210    MemStoreSegmentsIterator iterator = null;
211    List<ImmutableSegment> segments = versionedList.getStoreSegments();
212    for (ImmutableSegment s : segments) {
213      s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
214    }
215
216    switch (action) {
217      case COMPACT:
218        iterator = new MemStoreCompactorSegmentsIterator(segments,
219            compactingMemStore.getComparator(),
220            compactionKVMax, compactingMemStore.getStore());
221
222        result = SegmentFactory.instance().createImmutableSegmentByCompaction(
223          compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
224          versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action);
225        iterator.close();
226        break;
227      case MERGE:
228      case MERGE_COUNT_UNIQUE_KEYS:
229        iterator =
230            new MemStoreMergerSegmentsIterator(segments,
231                compactingMemStore.getComparator(), compactionKVMax);
232
233        result = SegmentFactory.instance().createImmutableSegmentByMerge(
234          compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
235          versionedList.getNumOfCells(), segments, compactingMemStore.getIndexType(), action);
236        iterator.close();
237        break;
238      default:
239        throw new RuntimeException("Unknown action " + action); // sanity check
240    }
241
242    return result;
243  }
244
245  @VisibleForTesting
246  void initiateCompactionStrategy(MemoryCompactionPolicy compType,
247      Configuration configuration, String cfName) throws IllegalArgumentIOException {
248
249    assert (compType !=MemoryCompactionPolicy.NONE);
250
251    switch (compType){
252      case BASIC: strategy = new BasicMemStoreCompactionStrategy(configuration, cfName);
253        break;
254      case EAGER: strategy = new EagerMemStoreCompactionStrategy(configuration, cfName);
255        break;
256      case ADAPTIVE: strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName);
257        break;
258      default:
259        // sanity check
260        throw new IllegalArgumentIOException("Unknown memory compaction type " + compType);
261    }
262  }
263}