001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.yetus.audience.InterfaceAudience;
022import org.slf4j.Logger;
023import org.slf4j.LoggerFactory;
024
025/**
026 * MemStoreCompactionStrategy is the root of a class hierarchy which defines the strategy for
027 * choosing the next action to apply in an (in-memory) memstore compaction. Possible action are: -
028 * No-op - do nothing - Flatten - to change the segment's index from CSLM to a flat representation -
029 * Merge - to merge the indices of the segments in the pipeline - Compact - to merge the indices
030 * while removing data redundancies In addition while applying flat/merge actions it is possible to
031 * count the number of unique keys in the result segment.
032 */
033@InterfaceAudience.Private
034public abstract class MemStoreCompactionStrategy {
035
036  protected static final Logger LOG = LoggerFactory.getLogger(MemStoreCompactionStrategy.class);
037  // The upper bound for the number of segments we store in the pipeline prior to merging.
038  public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY =
039    "hbase.hregion.compacting.pipeline.segments.limit";
040  public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 2;
041
042  /**
043   * Types of actions to be done on the pipeline upon MemStoreCompaction invocation. Note that every
044   * value covers the previous ones, i.e. if MERGE is the action it implies that the youngest
045   * segment is going to be flatten anyway.
046   */
047  public enum Action {
048    NOOP,
049    FLATTEN, // flatten a segment in the pipeline
050    FLATTEN_COUNT_UNIQUE_KEYS, // flatten a segment in the pipeline and count its unique keys
051    MERGE, // merge all the segments in the pipeline into one
052    MERGE_COUNT_UNIQUE_KEYS, // merge all pipeline segments into one and count its unique keys
053    COMPACT // compact the data of all pipeline segments
054  }
055
056  protected final String cfName;
057  // The limit on the number of the segments in the pipeline
058  protected final int pipelineThreshold;
059
060  public MemStoreCompactionStrategy(Configuration conf, String cfName) {
061    this.cfName = cfName;
062    if (conf == null) {
063      pipelineThreshold = COMPACTING_MEMSTORE_THRESHOLD_DEFAULT;
064    } else {
065      pipelineThreshold = // get the limit on the number of the segments in the pipeline
066        conf.getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY, COMPACTING_MEMSTORE_THRESHOLD_DEFAULT);
067    }
068  }
069
070  @Override
071  public String toString() {
072    return getName() + ", pipelineThreshold=" + this.pipelineThreshold;
073  }
074
075  protected abstract String getName();
076
077  // get next compaction action to apply on compaction pipeline
078  public abstract Action getAction(VersionedSegmentsList versionedList);
079
080  // update policy stats based on the segment that replaced previous versioned list (in
081  // compaction pipeline)
082  public void updateStats(Segment replacement) {
083  }
084
085  // resets policy stats
086  public void resetStats() {
087  }
088
089  protected Action simpleMergeOrFlatten(VersionedSegmentsList versionedList, String strategy) {
090    int numOfSegments = versionedList.getNumOfSegments();
091    if (numOfSegments > pipelineThreshold) {
092      // to avoid too many segments, merge now
093      LOG.trace("Strategy={}, store={}; merging {} segments", strategy, cfName, numOfSegments);
094      return getMergingAction();
095    }
096
097    // just flatten a segment
098    LOG.trace("Strategy={}, store={}; flattening a segment", strategy, cfName);
099    return getFlattenAction();
100  }
101
102  protected Action getMergingAction() {
103    return Action.MERGE;
104  }
105
106  protected Action getFlattenAction() {
107    return Action.FLATTEN;
108  }
109
110  protected Action compact(VersionedSegmentsList versionedList, String strategyInfo) {
111    int numOfSegments = versionedList.getNumOfSegments();
112    LOG.trace("{} in-memory compaction for store={} compacting {} segments", strategyInfo, cfName,
113      numOfSegments);
114    return Action.COMPACT;
115  }
116}