001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
022import org.apache.hadoop.util.StringUtils;
023import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.atomic.AtomicBoolean;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellComparator;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.MemoryCompactionPolicy;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.ClassSize;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.wal.WAL;
042
043/**
044 * A memstore implementation which supports in-memory compaction.
045 * A compaction pipeline is added between the active set and the snapshot data structures;
046 * it consists of a list of segments that are subject to compaction.
047 * Like the snapshot, all pipeline segments are read-only; updates only affect the active set.
048 * To ensure this property we take advantage of the existing blocking mechanism -- the active set
049 * is pushed to the pipeline while holding the region's updatesLock in exclusive mode.
050 * Periodically, a compaction is applied in the background to all pipeline segments resulting
051 * in a single read-only component. The ``old'' segments are discarded when no scanner is reading
052 * them.
053 */
054@InterfaceAudience.Private
055public class CompactingMemStore extends AbstractMemStore {
056
057  // The external setting of the compacting MemStore behaviour
058  public static final String COMPACTING_MEMSTORE_TYPE_KEY =
059      "hbase.hregion.compacting.memstore.type";
060  public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT =
061      String.valueOf(MemoryCompactionPolicy.NONE);
062  // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
063  public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
064      "hbase.memstore.inmemoryflush.threshold.factor";
065  private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.014;
066  // In-Memory compaction pool size
067  public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY =
068      "hbase.regionserver.inmemory.compaction.pool.size";
069  public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10;
070
071  private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
072  private HStore store;
073  private CompactionPipeline pipeline;
074  protected MemStoreCompactor compactor;
075
076  private long inmemoryFlushSize;       // the threshold on active size for in-memory flush
077  private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
078
079  // inWalReplay is true while we are synchronously replaying the edits from WAL
080  private boolean inWalReplay = false;
081
082  @VisibleForTesting
083  protected final AtomicBoolean allowCompaction = new AtomicBoolean(true);
084  private boolean compositeSnapshot = true;
085
086  /**
087   * Types of indexes (part of immutable segments) to be used after flattening,
088   * compaction, or merge are applied.
089   */
090  public enum IndexType {
091    CSLM_MAP,   // ConcurrentSkipLisMap
092    ARRAY_MAP,  // CellArrayMap
093    CHUNK_MAP   // CellChunkMap
094  }
095
096  private IndexType indexType = IndexType.ARRAY_MAP;  // default implementation
097
098  public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
099      + 6 * ClassSize.REFERENCE     // Store, CompactionPipeline,
100                                    // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction,
101                                    // indexType
102      + Bytes.SIZEOF_LONG           // inmemoryFlushSize
103      + 2 * Bytes.SIZEOF_BOOLEAN    // compositeSnapshot and inWalReplay
104      + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
105      + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD);
106
107  public CompactingMemStore(Configuration conf, CellComparator c,
108      HStore store, RegionServicesForStores regionServices,
109      MemoryCompactionPolicy compactionPolicy) throws IOException {
110    super(conf, c, regionServices);
111    this.store = store;
112    this.regionServices = regionServices;
113    this.pipeline = new CompactionPipeline(getRegionServices());
114    this.compactor = createMemStoreCompactor(compactionPolicy);
115    if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
116      // if user requested to work with MSLABs (whether on- or off-heap), then the
117      // immutable segments are going to use CellChunkMap as their index
118      indexType = IndexType.CHUNK_MAP;
119    } else {
120      indexType = IndexType.ARRAY_MAP;
121    }
122    // initialization of the flush size should happen after initialization of the index type
123    // so do not transfer the following method
124    initInmemoryFlushSize(conf);
125    LOG.info("Store={}, in-memory flush size threshold={}, immutable segments index type={}, " +
126            "compactor={}", this.store.getColumnFamilyName(),
127        StringUtils.byteDesc(this.inmemoryFlushSize), this.indexType,
128        (this.compactor == null? "NULL": this.compactor.toString()));
129  }
130
131  @VisibleForTesting
132  protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
133      throws IllegalArgumentIOException {
134    return new MemStoreCompactor(this, compactionPolicy);
135  }
136
137  private void initInmemoryFlushSize(Configuration conf) {
138    double factor = 0;
139    long memstoreFlushSize = getRegionServices().getMemStoreFlushSize();
140    int numStores = getRegionServices().getNumStores();
141    if (numStores <= 1) {
142      // Family number might also be zero in some of our unit test case
143      numStores = 1;
144    }
145    inmemoryFlushSize = memstoreFlushSize / numStores;
146    // multiply by a factor (the same factor for all index types)
147    factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
148          IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT);
149
150    inmemoryFlushSize = (long) (inmemoryFlushSize * factor);
151  }
152
153  /**
154   * @return Total memory occupied by this MemStore. This won't include any size occupied by the
155   *         snapshot. We assume the snapshot will get cleared soon. This is not thread safe and
156   *         the memstore may be changed while computing its size. It is the responsibility of the
157   *         caller to make sure this doesn't happen.
158   */
159  @Override
160  public MemStoreSize size() {
161    MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
162    memstoreSizing.incMemStoreSize(active.getMemStoreSize());
163    for (Segment item : pipeline.getSegments()) {
164      memstoreSizing.incMemStoreSize(item.getMemStoreSize());
165    }
166    return memstoreSizing.getMemStoreSize();
167  }
168
169  /**
170   * This method is called before the flush is executed.
171   * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush
172   * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
173   */
174  @Override
175  public long preFlushSeqIDEstimation() {
176    if(compositeSnapshot) {
177      return HConstants.NO_SEQNUM;
178    }
179    Segment segment = getLastSegment();
180    if(segment == null) {
181      return HConstants.NO_SEQNUM;
182    }
183    return segment.getMinSequenceId();
184  }
185
186  @Override
187  public boolean isSloppy() {
188    return true;
189  }
190
191  /**
192   * Push the current active memstore segment into the pipeline
193   * and create a snapshot of the tail of current compaction pipeline
194   * Snapshot must be cleared by call to {@link #clearSnapshot}.
195   * {@link #clearSnapshot(long)}.
196   * @return {@link MemStoreSnapshot}
197   */
198  @Override
199  public MemStoreSnapshot snapshot() {
200    // If snapshot currently has entries, then flusher failed or didn't call
201    // cleanup.  Log a warning.
202    if (!this.snapshot.isEmpty()) {
203      LOG.warn("Snapshot called again without clearing previous. " +
204          "Doing nothing. Another ongoing flush or did we fail last attempt?");
205    } else {
206      LOG.debug("FLUSHING TO DISK {}, store={}",
207            getRegionServices().getRegionInfo().getEncodedName(), getFamilyName());
208      stopCompaction();
209      pushActiveToPipeline(this.active);
210      snapshotId = EnvironmentEdgeManager.currentTime();
211      // in both cases whatever is pushed to snapshot is cleared from the pipeline
212      if (compositeSnapshot) {
213        pushPipelineToSnapshot();
214      } else {
215        pushTailToSnapshot();
216      }
217      compactor.resetStats();
218    }
219    return new MemStoreSnapshot(snapshotId, this.snapshot);
220  }
221
222  @Override
223  public MemStoreSize getFlushableSize() {
224    MemStoreSize mss = getSnapshotSize();
225    if (mss.getDataSize() == 0) {
226      // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
227      if (compositeSnapshot) {
228        MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
229        memStoreSizing.incMemStoreSize(this.active.getMemStoreSize());
230        mss = memStoreSizing.getMemStoreSize();
231      } else {
232        mss = pipeline.getTailSize();
233      }
234    }
235    return mss.getDataSize() > 0? mss: this.active.getMemStoreSize();
236  }
237
238  @Override
239  protected long keySize() {
240    // Need to consider dataSize/keySize of all segments in pipeline and active
241    long keySize = this.active.getDataSize();
242    for (Segment segment : this.pipeline.getSegments()) {
243      keySize += segment.getDataSize();
244    }
245    return keySize;
246  }
247
248  @Override
249  protected long heapSize() {
250    // Need to consider heapOverhead of all segments in pipeline and active
251    long h = this.active.getHeapSize();
252    for (Segment segment : this.pipeline.getSegments()) {
253      h += segment.getHeapSize();
254    }
255    return h;
256  }
257
258  @Override
259  public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) {
260    long minSequenceId = pipeline.getMinSequenceId();
261    if(minSequenceId != Long.MAX_VALUE) {
262      byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes();
263      byte[] familyName = getFamilyNameInBytes();
264      WAL WAL = getRegionServices().getWAL();
265      if (WAL != null) {
266        WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater);
267      }
268    }
269  }
270
271  /**
272   * This message intends to inform the MemStore that next coming updates
273   * are going to be part of the replaying edits from WAL
274   */
275  @Override
276  public void startReplayingFromWAL() {
277    inWalReplay = true;
278  }
279
280  /**
281   * This message intends to inform the MemStore that the replaying edits from WAL
282   * are done
283   */
284  @Override
285  public void stopReplayingFromWAL() {
286    inWalReplay = false;
287  }
288
289  // the getSegments() method is used for tests only
290  @VisibleForTesting
291  @Override
292  protected List<Segment> getSegments() {
293    List<? extends Segment> pipelineList = pipeline.getSegments();
294    List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
295    list.add(this.active);
296    list.addAll(pipelineList);
297    list.addAll(this.snapshot.getAllSegments());
298
299    return list;
300  }
301
302  // the following three methods allow to manipulate the settings of composite snapshot
303  public void setCompositeSnapshot(boolean useCompositeSnapshot) {
304    this.compositeSnapshot = useCompositeSnapshot;
305  }
306
307  public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
308      boolean merge) {
309    // last true stands for updating the region size
310    return pipeline.swap(versionedList, result, !merge, true);
311  }
312
313  /**
314   * @param requesterVersion The caller must hold the VersionedList of the pipeline
315   *           with version taken earlier. This version must be passed as a parameter here.
316   *           The flattening happens only if versions match.
317   */
318  public void flattenOneSegment(long requesterVersion,  MemStoreCompactionStrategy.Action action) {
319    pipeline.flattenOneSegment(requesterVersion, indexType, action);
320  }
321
322  // setter is used only for testability
323  @VisibleForTesting
324  void setIndexType(IndexType type) {
325    indexType = type;
326    // Because this functionality is for testing only and tests are setting in-memory flush size
327    // according to their need, there is no setting of in-memory flush size, here.
328    // If it is needed, please change in-memory flush size explicitly
329  }
330
331  public IndexType getIndexType() {
332    return indexType;
333  }
334
335  public boolean hasImmutableSegments() {
336    return !pipeline.isEmpty();
337  }
338
339  public VersionedSegmentsList getImmutableSegments() {
340    return pipeline.getVersionedList();
341  }
342
343  public long getSmallestReadPoint() {
344    return store.getSmallestReadPoint();
345  }
346
347  public HStore getStore() {
348    return store;
349  }
350
351  public String getFamilyName() {
352    return Bytes.toString(getFamilyNameInBytes());
353  }
354
355  @Override
356  /*
357   * Scanners are ordered from 0 (oldest) to newest in increasing order.
358   */
359  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
360    MutableSegment activeTmp = active;
361    List<? extends Segment> pipelineList = pipeline.getSegments();
362    List<? extends Segment> snapshotList = snapshot.getAllSegments();
363    long order = 1L + pipelineList.size() + snapshotList.size();
364    // The list of elements in pipeline + the active element + the snapshot segment
365    // The order is the Segment ordinal
366    List<KeyValueScanner> list = createList((int) order);
367    order = addToScanners(activeTmp, readPt, order, list);
368    order = addToScanners(pipelineList, readPt, order, list);
369    addToScanners(snapshotList, readPt, order, list);
370    return list;
371  }
372
373   @VisibleForTesting
374   protected List<KeyValueScanner> createList(int capacity) {
375     return new ArrayList<>(capacity);
376   }
377
378  /**
379   * Check whether anything need to be done based on the current active set size.
380   * The method is invoked upon every addition to the active set.
381   * For CompactingMemStore, flush the active set to the read-only memory if it's
382   * size is above threshold
383   */
384  @Override
385  protected void checkActiveSize() {
386    if (shouldFlushInMemory()) {
387      /* The thread is dispatched to flush-in-memory. This cannot be done
388      * on the same thread, because for flush-in-memory we require updatesLock
389      * in exclusive mode while this method (checkActiveSize) is invoked holding updatesLock
390      * in the shared mode. */
391      InMemoryFlushRunnable runnable = new InMemoryFlushRunnable();
392      if (LOG.isTraceEnabled()) {
393        LOG.trace(
394          "Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName());
395      }
396      getPool().execute(runnable);
397    }
398  }
399
400  // internally used method, externally visible only for tests
401  // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
402  // otherwise there is a deadlock
403  @VisibleForTesting
404  void flushInMemory() throws IOException {
405    // setting the inMemoryFlushInProgress flag again for the case this method is invoked
406    // directly (only in tests) in the common path setting from true to true is idempotent
407    inMemoryFlushInProgress.set(true);
408    try {
409      // Phase I: Update the pipeline
410      getRegionServices().blockUpdates();
411      try {
412        LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
413        pushActiveToPipeline(this.active);
414      } finally {
415        getRegionServices().unblockUpdates();
416      }
417
418      // Used by tests
419      if (!allowCompaction.get()) {
420        return;
421      }
422      // Phase II: Compact the pipeline
423      try {
424        // Speculative compaction execution, may be interrupted if flush is forced while
425        // compaction is in progress
426        compactor.start();
427      } catch (IOException e) {
428        LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}",
429            getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e);
430      }
431    } finally {
432      inMemoryFlushInProgress.set(false);
433      LOG.trace("IN-MEMORY FLUSH: end");
434    }
435  }
436
437  private Segment getLastSegment() {
438    Segment localActive = getActive();
439    Segment tail = pipeline.getTail();
440    return tail == null ? localActive : tail;
441  }
442
443  private byte[] getFamilyNameInBytes() {
444    return store.getColumnFamilyDescriptor().getName();
445  }
446
447  private ThreadPoolExecutor getPool() {
448    return getRegionServices().getInMemoryCompactionPool();
449  }
450
451  @VisibleForTesting
452  protected boolean shouldFlushInMemory() {
453    if (this.active.getDataSize() > inmemoryFlushSize) { // size above flush threshold
454      if (inWalReplay) {  // when replaying edits from WAL there is no need in in-memory flush
455        return false;     // regardless the size
456      }
457      // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude
458      // the insert of the active into the compaction pipeline
459      return (inMemoryFlushInProgress.compareAndSet(false,true));
460    }
461    return false;
462  }
463
464  /**
465   * The request to cancel the compaction asynchronous task (caused by in-memory flush)
466   * The compaction may still happen if the request was sent too late
467   * Non-blocking request
468   */
469  private void stopCompaction() {
470    if (inMemoryFlushInProgress.get()) {
471      compactor.stop();
472    }
473  }
474
475  protected void pushActiveToPipeline(MutableSegment active) {
476    if (!active.isEmpty()) {
477      pipeline.pushHead(active);
478      resetActive();
479    }
480  }
481
482  private void pushTailToSnapshot() {
483    VersionedSegmentsList segments = pipeline.getVersionedTail();
484    pushToSnapshot(segments.getStoreSegments());
485    // In Swap: don't close segments (they are in snapshot now) and don't update the region size
486    pipeline.swap(segments,null,false, false);
487  }
488
489  private void pushPipelineToSnapshot() {
490    int iterationsCnt = 0;
491    boolean done = false;
492    while (!done) {
493      iterationsCnt++;
494      VersionedSegmentsList segments = pipeline.getVersionedList();
495      pushToSnapshot(segments.getStoreSegments());
496      // swap can return false in case the pipeline was updated by ongoing compaction
497      // and the version increase, the chance of it happenning is very low
498      // In Swap: don't close segments (they are in snapshot now) and don't update the region size
499      done = pipeline.swap(segments, null, false, false);
500      if (iterationsCnt>2) {
501        // practically it is impossible that this loop iterates more than two times
502        // (because the compaction is stopped and none restarts it while in snapshot request),
503        // however stopping here for the case of the infinite loop causing by any error
504        LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," +
505            " while flushing to disk.");
506        this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator());
507        break;
508      }
509    }
510  }
511
512  private void pushToSnapshot(List<ImmutableSegment> segments) {
513    if(segments.isEmpty()) return;
514    if(segments.size() == 1 && !segments.get(0).isEmpty()) {
515      this.snapshot = segments.get(0);
516      return;
517    } else { // create composite snapshot
518      this.snapshot =
519          SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments);
520    }
521  }
522
523  private RegionServicesForStores getRegionServices() {
524    return regionServices;
525  }
526
527  /**
528  * The in-memory-flusher thread performs the flush asynchronously.
529  * There is at most one thread per memstore instance.
530  * It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock
531  * and compacts the pipeline.
532  */
533  private class InMemoryFlushRunnable implements Runnable {
534
535    @Override
536    public void run() {
537      try {
538        flushInMemory();
539      } catch (IOException e) {
540        LOG.warn("Unable to run memstore compaction. region "
541            + getRegionServices().getRegionInfo().getRegionNameAsString()
542            + "store: "+ getFamilyName(), e);
543      }
544    }
545  }
546
547  @VisibleForTesting
548  boolean isMemStoreFlushingInMemory() {
549    return inMemoryFlushInProgress.get();
550  }
551
552  /**
553   * @param cell Find the row that comes after this one.  If null, we return the
554   *             first.
555   * @return Next row or null if none found.
556   */
557  Cell getNextRow(final Cell cell) {
558    Cell lowest = null;
559    List<Segment> segments = getSegments();
560    for (Segment segment : segments) {
561      if (lowest == null) {
562        lowest = getNextRow(cell, segment.getCellSet());
563      } else {
564        lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
565      }
566    }
567    return lowest;
568  }
569
570  @VisibleForTesting
571  long getInmemoryFlushSize() {
572    return inmemoryFlushSize;
573  }
574
575  // debug method
576  public void debug() {
577    String msg = "active size=" + this.active.getDataSize();
578    msg += " in-memory flush size is "+ inmemoryFlushSize;
579    msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
580    msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false");
581    LOG.debug(msg);
582  }
583
584}