001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.atomic.AtomicBoolean;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.ExtendedCell;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.MemoryCompactionPolicy;
031import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.ClassSize;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hadoop.hbase.wal.WAL;
036import org.apache.hadoop.util.StringUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * A memstore implementation which supports in-memory compaction. A compaction pipeline is added
043 * between the active set and the snapshot data structures; it consists of a list of segments that
044 * are subject to compaction. Like the snapshot, all pipeline segments are read-only; updates only
045 * affect the active set. To ensure this property we take advantage of the existing blocking
046 * mechanism -- the active set is pushed to the pipeline while holding the region's updatesLock in
047 * exclusive mode. Periodically, a compaction is applied in the background to all pipeline segments
048 * resulting in a single read-only component. The ``old'' segments are discarded when no scanner is
049 * reading them.
050 */
051@InterfaceAudience.Private
052public class CompactingMemStore extends AbstractMemStore {
053
054  // The external setting of the compacting MemStore behaviour
055  public static final String COMPACTING_MEMSTORE_TYPE_KEY =
056    "hbase.hregion.compacting.memstore.type";
057  public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT =
058    String.valueOf(MemoryCompactionPolicy.NONE);
059  // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
060  public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
061    "hbase.memstore.inmemoryflush.threshold.factor";
062  private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1;
063  // In-Memory compaction pool size
064  public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY =
065    "hbase.regionserver.inmemory.compaction.pool.size";
066  public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10;
067
068  private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
069  private HStore store;
070  private CompactionPipeline pipeline;
071  protected MemStoreCompactor compactor;
072
073  private long inmemoryFlushSize; // the threshold on active size for in-memory flush
074  private final AtomicBoolean inMemoryCompactionInProgress = new AtomicBoolean(false);
075
076  // inWalReplay is true while we are synchronously replaying the edits from WAL
077  private boolean inWalReplay = false;
078
079  protected final AtomicBoolean allowCompaction = new AtomicBoolean(true);
080  private boolean compositeSnapshot = true;
081
082  /**
083   * Types of indexes (part of immutable segments) to be used after flattening, compaction, or merge
084   * are applied.
085   */
086  public enum IndexType {
087    CSLM_MAP, // ConcurrentSkipLisMap
088    ARRAY_MAP, // CellArrayMap
089    CHUNK_MAP // CellChunkMap
090  }
091
092  private IndexType indexType = IndexType.ARRAY_MAP; // default implementation
093
094  public static final long DEEP_OVERHEAD =
095    ClassSize.align(AbstractMemStore.DEEP_OVERHEAD + 6 * ClassSize.REFERENCE // Store,
096                                                                             // CompactionPipeline,
097    // MemStoreCompactor, inMemoryCompactionInProgress,
098    // allowCompaction, indexType
099      + Bytes.SIZEOF_LONG // inmemoryFlushSize
100      + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay
101      + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and allowCompaction
102      + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD);
103
104  public CompactingMemStore(Configuration conf, CellComparator c, HStore store,
105    RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
106    throws IOException {
107    super(conf, c, regionServices);
108    this.store = store;
109    this.regionServices = regionServices;
110    this.pipeline = new CompactionPipeline(getRegionServices());
111    this.compactor = createMemStoreCompactor(compactionPolicy);
112    if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
113      // if user requested to work with MSLABs (whether on- or off-heap), then the
114      // immutable segments are going to use CellChunkMap as their index
115      indexType = IndexType.CHUNK_MAP;
116    } else {
117      indexType = IndexType.ARRAY_MAP;
118    }
119    // initialization of the flush size should happen after initialization of the index type
120    // so do not transfer the following method
121    initInmemoryFlushSize(conf);
122    LOG.info(
123      "Store={}, in-memory flush size threshold={}, immutable segments index type={}, "
124        + "compactor={}",
125      this.store.getColumnFamilyName(), StringUtils.byteDesc(this.inmemoryFlushSize),
126      this.indexType, (this.compactor == null ? "NULL" : this.compactor.toString()));
127  }
128
129  protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
130    throws IllegalArgumentIOException {
131    return new MemStoreCompactor(this, compactionPolicy);
132  }
133
134  private void initInmemoryFlushSize(Configuration conf) {
135    double factor = 0;
136    long memstoreFlushSize = getRegionServices().getMemStoreFlushSize();
137    int numStores = getRegionServices().getNumStores();
138    if (numStores <= 1) {
139      // Family number might also be zero in some of our unit test case
140      numStores = 1;
141    }
142    factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0);
143    if (factor != 0.0) {
144      // multiply by a factor (the same factor for all index types)
145      inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores;
146    } else {
147      inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER
148        * conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
149      inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER;
150    }
151  }
152
153  /**
154   * @return Total memory occupied by this MemStore. This won't include any size occupied by the
155   *         snapshot. We assume the snapshot will get cleared soon. This is not thread safe and the
156   *         memstore may be changed while computing its size. It is the responsibility of the
157   *         caller to make sure this doesn't happen.
158   */
159  @Override
160  public MemStoreSize size() {
161    MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
162    memstoreSizing.incMemStoreSize(getActive().getMemStoreSize());
163    for (Segment item : pipeline.getSegments()) {
164      memstoreSizing.incMemStoreSize(item.getMemStoreSize());
165    }
166    return memstoreSizing.getMemStoreSize();
167  }
168
169  /**
170   * This method is called before the flush is executed.
171   * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush is
172   *         executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
173   */
174  @Override
175  public long preFlushSeqIDEstimation() {
176    if (compositeSnapshot) {
177      return HConstants.NO_SEQNUM;
178    }
179    Segment segment = getLastSegment();
180    if (segment == null) {
181      return HConstants.NO_SEQNUM;
182    }
183    return segment.getMinSequenceId();
184  }
185
186  @Override
187  public boolean isSloppy() {
188    return true;
189  }
190
191  /**
192   * Push the current active memstore segment into the pipeline and create a snapshot of the tail of
193   * current compaction pipeline Snapshot must be cleared by call to {@link #clearSnapshot}.
194   * {@link #clearSnapshot(long)}.
195   * @return {@link MemStoreSnapshot}
196   */
197  @Override
198  public MemStoreSnapshot snapshot() {
199    // If snapshot currently has entries, then flusher failed or didn't call
200    // cleanup. Log a warning.
201    if (!this.snapshot.isEmpty()) {
202      LOG.warn("Snapshot called again without clearing previous. "
203        + "Doing nothing. Another ongoing flush or did we fail last attempt?");
204    } else {
205      LOG.debug("FLUSHING TO DISK {}, store={}",
206        getRegionServices().getRegionInfo().getEncodedName(), getFamilyName());
207      stopCompaction();
208      // region level lock ensures pushing active to pipeline is done in isolation
209      // no concurrent update operations trying to flush the active segment
210      pushActiveToPipeline(getActive(), true);
211      resetTimeOfOldestEdit();
212      snapshotId = EnvironmentEdgeManager.currentTime();
213      // in both cases whatever is pushed to snapshot is cleared from the pipeline
214      if (compositeSnapshot) {
215        pushPipelineToSnapshot();
216      } else {
217        pushTailToSnapshot();
218      }
219      compactor.resetStats();
220    }
221    return new MemStoreSnapshot(snapshotId, this.snapshot);
222  }
223
224  @Override
225  public MemStoreSize getFlushableSize() {
226    MemStoreSize mss = getSnapshotSize();
227    if (mss.getDataSize() == 0) {
228      // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
229      if (compositeSnapshot) {
230        MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
231        MutableSegment currActive = getActive();
232        if (!currActive.isEmpty()) {
233          memStoreSizing.incMemStoreSize(currActive.getMemStoreSize());
234        }
235        mss = memStoreSizing.getMemStoreSize();
236      } else {
237        mss = pipeline.getTailSize();
238      }
239    }
240    return mss.getDataSize() > 0 ? mss : getActive().getMemStoreSize();
241  }
242
243  public void setInMemoryCompactionCompleted() {
244    inMemoryCompactionInProgress.set(false);
245  }
246
247  protected boolean setInMemoryCompactionFlag() {
248    return inMemoryCompactionInProgress.compareAndSet(false, true);
249  }
250
251  @Override
252  protected long keySize() {
253    // Need to consider dataSize/keySize of all segments in pipeline and active
254    long keySize = getActive().getDataSize();
255    for (Segment segment : this.pipeline.getSegments()) {
256      keySize += segment.getDataSize();
257    }
258    return keySize;
259  }
260
261  @Override
262  protected long heapSize() {
263    // Need to consider heapOverhead of all segments in pipeline and active
264    long h = getActive().getHeapSize();
265    for (Segment segment : this.pipeline.getSegments()) {
266      h += segment.getHeapSize();
267    }
268    return h;
269  }
270
271  @Override
272  public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) {
273    long minSequenceId = pipeline.getMinSequenceId();
274    if (minSequenceId != Long.MAX_VALUE) {
275      byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes();
276      byte[] familyName = getFamilyNameInBytes();
277      WAL WAL = getRegionServices().getWAL();
278      if (WAL != null) {
279        WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater);
280      }
281    }
282  }
283
284  /**
285   * This message intends to inform the MemStore that next coming updates are going to be part of
286   * the replaying edits from WAL
287   */
288  @Override
289  public void startReplayingFromWAL() {
290    inWalReplay = true;
291  }
292
293  /**
294   * This message intends to inform the MemStore that the replaying edits from WAL are done
295   */
296  @Override
297  public void stopReplayingFromWAL() {
298    inWalReplay = false;
299  }
300
301  /**
302   * Issue any synchronization and test needed before applying the update For compacting memstore
303   * this means checking the update can increase the size without overflow
304   * @param currentActive  the segment to be updated
305   * @param cell           the cell to be added
306   * @param memstoreSizing object to accumulate region size changes
307   * @return true iff can proceed with applying the update
308   */
309  @Override
310  protected boolean preUpdate(MutableSegment currentActive, ExtendedCell cell,
311    MemStoreSizing memstoreSizing) {
312    if (currentActive.sharedLock()) {
313      if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) {
314        return true;
315      }
316      currentActive.sharedUnlock();
317    }
318    return false;
319  }
320
321  @Override
322  protected void postUpdate(MutableSegment currentActive) {
323    currentActive.sharedUnlock();
324  }
325
326  @Override
327  protected boolean sizeAddedPreOperation() {
328    return true;
329  }
330
331  // the getSegments() method is used for tests only
332  @Override
333  protected List<Segment> getSegments() {
334    List<? extends Segment> pipelineList = pipeline.getSegments();
335    List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
336    list.add(getActive());
337    list.addAll(pipelineList);
338    list.addAll(snapshot.getAllSegments());
339
340    return list;
341  }
342
343  // the following three methods allow to manipulate the settings of composite snapshot
344  public void setCompositeSnapshot(boolean useCompositeSnapshot) {
345    this.compositeSnapshot = useCompositeSnapshot;
346  }
347
348  public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
349    boolean merge) {
350    // last true stands for updating the region size
351    return pipeline.swap(versionedList, result, !merge, true);
352  }
353
354  /**
355   * @param requesterVersion The caller must hold the VersionedList of the pipeline with version
356   *                         taken earlier. This version must be passed as a parameter here. The
357   *                         flattening happens only if versions match.
358   */
359  public void flattenOneSegment(long requesterVersion, MemStoreCompactionStrategy.Action action) {
360    pipeline.flattenOneSegment(requesterVersion, indexType, action);
361  }
362
363  // setter is used only for testability
364  void setIndexType(IndexType type) {
365    indexType = type;
366    // Because this functionality is for testing only and tests are setting in-memory flush size
367    // according to their need, there is no setting of in-memory flush size, here.
368    // If it is needed, please change in-memory flush size explicitly
369  }
370
371  public IndexType getIndexType() {
372    return indexType;
373  }
374
375  public boolean hasImmutableSegments() {
376    return !pipeline.isEmpty();
377  }
378
379  public VersionedSegmentsList getImmutableSegments() {
380    return pipeline.getVersionedList();
381  }
382
383  public long getSmallestReadPoint() {
384    return store.getSmallestReadPoint();
385  }
386
387  public HStore getStore() {
388    return store;
389  }
390
391  public String getFamilyName() {
392    return Bytes.toString(getFamilyNameInBytes());
393  }
394
395  /**
396   * This method is protected under {@link HStore#lock} read lock.
397   */
398  @Override
399  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
400    MutableSegment activeTmp = getActive();
401    List<? extends Segment> pipelineList = pipeline.getSegments();
402    List<? extends Segment> snapshotList = snapshot.getAllSegments();
403    long numberOfSegments = 1L + pipelineList.size() + snapshotList.size();
404    // The list of elements in pipeline + the active element + the snapshot segment
405    List<KeyValueScanner> list = createList((int) numberOfSegments);
406    addToScanners(activeTmp, readPt, list);
407    addToScanners(pipelineList, readPt, list);
408    addToScanners(snapshotList, readPt, list);
409    return list;
410  }
411
412  protected List<KeyValueScanner> createList(int capacity) {
413    return new ArrayList<>(capacity);
414  }
415
416  /**
417   * Check whether anything need to be done based on the current active set size. The method is
418   * invoked upon every addition to the active set. For CompactingMemStore, flush the active set to
419   * the read-only memory if it's size is above threshold
420   * @param currActive     intended segment to update
421   * @param cellToAdd      cell to be added to the segment
422   * @param memstoreSizing object to accumulate changed size
423   * @return true if the cell can be added to the currActive
424   */
425  protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
426    MemStoreSizing memstoreSizing) {
427    long cellSize = MutableSegment.getCellLength(cellToAdd);
428    boolean successAdd = false;
429    while (true) {
430      long segmentDataSize = currActive.getDataSize();
431      if (!inWalReplay && segmentDataSize > inmemoryFlushSize) {
432        // when replaying edits from WAL there is no need in in-memory flush regardless the size
433        // otherwise size below flush threshold try to update atomically
434        break;
435      }
436      if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
437        if (memstoreSizing != null) {
438          memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
439        }
440        successAdd = true;
441        break;
442      }
443    }
444
445    if (!inWalReplay && currActive.getDataSize() > inmemoryFlushSize) {
446      // size above flush threshold so we flush in memory
447      this.tryFlushInMemoryAndCompactingAsync(currActive);
448    }
449    return successAdd;
450  }
451
452  /**
453   * Try to flush the currActive in memory and submit the background
454   * {@link InMemoryCompactionRunnable} to
455   * {@link RegionServicesForStores#getInMemoryCompactionPool()}. Just one thread can do the actual
456   * flushing in memory.
457   * @param currActive current Active Segment to be flush in memory.
458   */
459  private void tryFlushInMemoryAndCompactingAsync(MutableSegment currActive) {
460    if (currActive.setInMemoryFlushed()) {
461      flushInMemory(currActive);
462      if (setInMemoryCompactionFlag()) {
463        // The thread is dispatched to do in-memory compaction in the background
464        InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
465        if (LOG.isTraceEnabled()) {
466          LOG.trace(
467            "Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName());
468        }
469        getPool().execute(runnable);
470      }
471    }
472  }
473
474  // externally visible only for tests
475  // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
476  // otherwise there is a deadlock
477  void flushInMemory() {
478    MutableSegment currActive = getActive();
479    if (currActive.setInMemoryFlushed()) {
480      flushInMemory(currActive);
481    }
482    inMemoryCompaction();
483  }
484
485  protected void flushInMemory(MutableSegment currActive) {
486    LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
487    // NOTE: Due to concurrent writes and because we first add cell size to currActive.getDataSize
488    // and then actually add cell to currActive.cellSet, it is possible that
489    // currActive.getDataSize could not accommodate cellToAdd but currActive.cellSet is still
490    // empty if pending writes which not yet add cells to currActive.cellSet.
491    // so here we should not check currActive.isEmpty or not.
492    pushActiveToPipeline(currActive, false);
493  }
494
495  void inMemoryCompaction() {
496    // setting the inMemoryCompactionInProgress flag again for the case this method is invoked
497    // directly (only in tests) in the common path setting from true to true is idempotent
498    inMemoryCompactionInProgress.set(true);
499    // Used by tests
500    if (!allowCompaction.get()) {
501      return;
502    }
503    try {
504      // Speculative compaction execution, may be interrupted if flush is forced while
505      // compaction is in progress
506      if (!compactor.start()) {
507        setInMemoryCompactionCompleted();
508      }
509    } catch (IOException e) {
510      LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}",
511        getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e);
512    }
513  }
514
515  private Segment getLastSegment() {
516    Segment localActive = getActive();
517    Segment tail = pipeline.getTail();
518    return tail == null ? localActive : tail;
519  }
520
521  private byte[] getFamilyNameInBytes() {
522    return store.getColumnFamilyDescriptor().getName();
523  }
524
525  private ThreadPoolExecutor getPool() {
526    return getRegionServices().getInMemoryCompactionPool();
527  }
528
529  /**
530   * The request to cancel the compaction asynchronous task (caused by in-memory flush) The
531   * compaction may still happen if the request was sent too late Non-blocking request
532   */
533  private void stopCompaction() {
534    if (inMemoryCompactionInProgress.get()) {
535      compactor.stop();
536    }
537  }
538
539  /**
540   * NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to
541   * concurrent writes and because we first add cell size to currActive.getDataSize and then
542   * actually add cell to currActive.cellSet, it is possible that currActive.getDataSize could not
543   * accommodate cellToAdd but currActive.cellSet is still empty if pending writes which not yet add
544   * cells to currActive.cellSet,so for
545   * {@link CompactingMemStore#flushInMemory(MutableSegment)},checkEmpty parameter is false. But if
546   * {@link CompactingMemStore#snapshot} called this method,because there is no pending
547   * write,checkEmpty parameter could be true.
548   */
549  protected void pushActiveToPipeline(MutableSegment currActive, boolean checkEmpty) {
550    if (!checkEmpty || !currActive.isEmpty()) {
551      pipeline.pushHead(currActive);
552      resetActive();
553    }
554  }
555
556  private void pushTailToSnapshot() {
557    VersionedSegmentsList segments = pipeline.getVersionedTail();
558    pushToSnapshot(segments.getStoreSegments());
559    // In Swap: don't close segments (they are in snapshot now) and don't update the region size
560    pipeline.swap(segments, null, false, false);
561  }
562
563  private void pushPipelineToSnapshot() {
564    int iterationsCnt = 0;
565    boolean done = false;
566    while (!done) {
567      iterationsCnt++;
568      VersionedSegmentsList segments = getImmutableSegments();
569      pushToSnapshot(segments.getStoreSegments());
570      // swap can return false in case the pipeline was updated by ongoing compaction
571      // and the version increase, the chance of it happenning is very low
572      // In Swap: don't close segments (they are in snapshot now) and don't update the region size
573      done = swapPipelineWithNull(segments);
574      if (iterationsCnt > 2) {
575        // practically it is impossible that this loop iterates more than two times
576        // (because the compaction is stopped and none restarts it while in snapshot request),
577        // however stopping here for the case of the infinite loop causing by any error
578        LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot,"
579          + " while flushing to disk.");
580        this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator());
581        break;
582      }
583    }
584  }
585
586  protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
587    return pipeline.swap(segments, null, false, false);
588  }
589
590  private void pushToSnapshot(List<ImmutableSegment> segments) {
591    if (segments.isEmpty()) return;
592    if (segments.size() == 1 && !segments.get(0).isEmpty()) {
593      this.snapshot = segments.get(0);
594      return;
595    } else { // create composite snapshot
596      this.snapshot =
597        SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments);
598    }
599  }
600
601  private RegionServicesForStores getRegionServices() {
602    return regionServices;
603  }
604
605  /**
606   * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per
607   * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline,
608   * releases updatesLock and compacts the pipeline.
609   */
610  private class InMemoryCompactionRunnable implements Runnable {
611    @Override
612    public void run() {
613      inMemoryCompaction();
614    }
615  }
616
617  boolean isMemStoreFlushingInMemory() {
618    return inMemoryCompactionInProgress.get();
619  }
620
621  /**
622   * @param cell Find the row that comes after this one. If null, we return the first.
623   * @return Next row or null if none found.
624   */
625  ExtendedCell getNextRow(final ExtendedCell cell) {
626    ExtendedCell lowest = null;
627    List<Segment> segments = getSegments();
628    for (Segment segment : segments) {
629      if (lowest == null) {
630        lowest = getNextRow(cell, segment.getCellSet());
631      } else {
632        lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
633      }
634    }
635    return lowest;
636  }
637
638  long getInmemoryFlushSize() {
639    return inmemoryFlushSize;
640  }
641
642  // debug method
643  public void debug() {
644    String msg = "active size=" + getActive().getDataSize();
645    msg += " allow compaction is " + (allowCompaction.get() ? "true" : "false");
646    msg +=
647      " inMemoryCompactionInProgress is " + (inMemoryCompactionInProgress.get() ? "true" : "false");
648    LOG.debug(msg);
649  }
650
651}