001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.atomic.AtomicBoolean;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.MemoryCompactionPolicy;
030import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.ClassSize;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.hbase.wal.WAL;
035import org.apache.hadoop.util.StringUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * A memstore implementation which supports in-memory compaction. A compaction pipeline is added
042 * between the active set and the snapshot data structures; it consists of a list of segments that
043 * are subject to compaction. Like the snapshot, all pipeline segments are read-only; updates only
044 * affect the active set. To ensure this property we take advantage of the existing blocking
045 * mechanism -- the active set is pushed to the pipeline while holding the region's updatesLock in
046 * exclusive mode. Periodically, a compaction is applied in the background to all pipeline segments
047 * resulting in a single read-only component. The ``old'' segments are discarded when no scanner is
048 * reading them.
049 */
050@InterfaceAudience.Private
051public class CompactingMemStore extends AbstractMemStore {
052
053  // The external setting of the compacting MemStore behaviour
054  public static final String COMPACTING_MEMSTORE_TYPE_KEY =
055    "hbase.hregion.compacting.memstore.type";
056  public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT =
057    String.valueOf(MemoryCompactionPolicy.NONE);
058  // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
059  public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
060    "hbase.memstore.inmemoryflush.threshold.factor";
061  private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1;
062  // In-Memory compaction pool size
063  public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY =
064    "hbase.regionserver.inmemory.compaction.pool.size";
065  public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10;
066
067  private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
068  private HStore store;
069  private CompactionPipeline pipeline;
070  protected MemStoreCompactor compactor;
071
072  private long inmemoryFlushSize; // the threshold on active size for in-memory flush
073  private final AtomicBoolean inMemoryCompactionInProgress = new AtomicBoolean(false);
074
075  // inWalReplay is true while we are synchronously replaying the edits from WAL
076  private boolean inWalReplay = false;
077
078  protected final AtomicBoolean allowCompaction = new AtomicBoolean(true);
079  private boolean compositeSnapshot = true;
080
081  /**
082   * Types of indexes (part of immutable segments) to be used after flattening, compaction, or merge
083   * are applied.
084   */
085  public enum IndexType {
086    CSLM_MAP, // ConcurrentSkipLisMap
087    ARRAY_MAP, // CellArrayMap
088    CHUNK_MAP // CellChunkMap
089  }
090
091  private IndexType indexType = IndexType.ARRAY_MAP; // default implementation
092
093  public static final long DEEP_OVERHEAD =
094    ClassSize.align(AbstractMemStore.DEEP_OVERHEAD + 6 * ClassSize.REFERENCE // Store,
095                                                                             // CompactionPipeline,
096    // MemStoreCompactor, inMemoryCompactionInProgress,
097    // allowCompaction, indexType
098      + Bytes.SIZEOF_LONG // inmemoryFlushSize
099      + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay
100      + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and allowCompaction
101      + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD);
102
103  public CompactingMemStore(Configuration conf, CellComparator c, HStore store,
104    RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
105    throws IOException {
106    super(conf, c, regionServices);
107    this.store = store;
108    this.regionServices = regionServices;
109    this.pipeline = new CompactionPipeline(getRegionServices());
110    this.compactor = createMemStoreCompactor(compactionPolicy);
111    if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
112      // if user requested to work with MSLABs (whether on- or off-heap), then the
113      // immutable segments are going to use CellChunkMap as their index
114      indexType = IndexType.CHUNK_MAP;
115    } else {
116      indexType = IndexType.ARRAY_MAP;
117    }
118    // initialization of the flush size should happen after initialization of the index type
119    // so do not transfer the following method
120    initInmemoryFlushSize(conf);
121    LOG.info(
122      "Store={}, in-memory flush size threshold={}, immutable segments index type={}, "
123        + "compactor={}",
124      this.store.getColumnFamilyName(), StringUtils.byteDesc(this.inmemoryFlushSize),
125      this.indexType, (this.compactor == null ? "NULL" : this.compactor.toString()));
126  }
127
128  protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
129    throws IllegalArgumentIOException {
130    return new MemStoreCompactor(this, compactionPolicy);
131  }
132
133  private void initInmemoryFlushSize(Configuration conf) {
134    double factor = 0;
135    long memstoreFlushSize = getRegionServices().getMemStoreFlushSize();
136    int numStores = getRegionServices().getNumStores();
137    if (numStores <= 1) {
138      // Family number might also be zero in some of our unit test case
139      numStores = 1;
140    }
141    factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0);
142    if (factor != 0.0) {
143      // multiply by a factor (the same factor for all index types)
144      inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores;
145    } else {
146      inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER
147        * conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
148      inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER;
149    }
150  }
151
152  /**
153   * @return Total memory occupied by this MemStore. This won't include any size occupied by the
154   *         snapshot. We assume the snapshot will get cleared soon. This is not thread safe and the
155   *         memstore may be changed while computing its size. It is the responsibility of the
156   *         caller to make sure this doesn't happen.
157   */
158  @Override
159  public MemStoreSize size() {
160    MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
161    memstoreSizing.incMemStoreSize(getActive().getMemStoreSize());
162    for (Segment item : pipeline.getSegments()) {
163      memstoreSizing.incMemStoreSize(item.getMemStoreSize());
164    }
165    return memstoreSizing.getMemStoreSize();
166  }
167
168  /**
169   * This method is called before the flush is executed.
170   * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush is
171   *         executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
172   */
173  @Override
174  public long preFlushSeqIDEstimation() {
175    if (compositeSnapshot) {
176      return HConstants.NO_SEQNUM;
177    }
178    Segment segment = getLastSegment();
179    if (segment == null) {
180      return HConstants.NO_SEQNUM;
181    }
182    return segment.getMinSequenceId();
183  }
184
185  @Override
186  public boolean isSloppy() {
187    return true;
188  }
189
190  /**
191   * Push the current active memstore segment into the pipeline and create a snapshot of the tail of
192   * current compaction pipeline Snapshot must be cleared by call to {@link #clearSnapshot}.
193   * {@link #clearSnapshot(long)}.
194   * @return {@link MemStoreSnapshot}
195   */
196  @Override
197  public MemStoreSnapshot snapshot() {
198    // If snapshot currently has entries, then flusher failed or didn't call
199    // cleanup. Log a warning.
200    if (!this.snapshot.isEmpty()) {
201      LOG.warn("Snapshot called again without clearing previous. "
202        + "Doing nothing. Another ongoing flush or did we fail last attempt?");
203    } else {
204      LOG.debug("FLUSHING TO DISK {}, store={}",
205        getRegionServices().getRegionInfo().getEncodedName(), getFamilyName());
206      stopCompaction();
207      // region level lock ensures pushing active to pipeline is done in isolation
208      // no concurrent update operations trying to flush the active segment
209      pushActiveToPipeline(getActive(), true);
210      resetTimeOfOldestEdit();
211      snapshotId = EnvironmentEdgeManager.currentTime();
212      // in both cases whatever is pushed to snapshot is cleared from the pipeline
213      if (compositeSnapshot) {
214        pushPipelineToSnapshot();
215      } else {
216        pushTailToSnapshot();
217      }
218      compactor.resetStats();
219    }
220    return new MemStoreSnapshot(snapshotId, this.snapshot);
221  }
222
223  @Override
224  public MemStoreSize getFlushableSize() {
225    MemStoreSize mss = getSnapshotSize();
226    if (mss.getDataSize() == 0) {
227      // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
228      if (compositeSnapshot) {
229        MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
230        MutableSegment currActive = getActive();
231        if (!currActive.isEmpty()) {
232          memStoreSizing.incMemStoreSize(currActive.getMemStoreSize());
233        }
234        mss = memStoreSizing.getMemStoreSize();
235      } else {
236        mss = pipeline.getTailSize();
237      }
238    }
239    return mss.getDataSize() > 0 ? mss : getActive().getMemStoreSize();
240  }
241
242  public void setInMemoryCompactionCompleted() {
243    inMemoryCompactionInProgress.set(false);
244  }
245
246  protected boolean setInMemoryCompactionFlag() {
247    return inMemoryCompactionInProgress.compareAndSet(false, true);
248  }
249
250  @Override
251  protected long keySize() {
252    // Need to consider dataSize/keySize of all segments in pipeline and active
253    long keySize = getActive().getDataSize();
254    for (Segment segment : this.pipeline.getSegments()) {
255      keySize += segment.getDataSize();
256    }
257    return keySize;
258  }
259
260  @Override
261  protected long heapSize() {
262    // Need to consider heapOverhead of all segments in pipeline and active
263    long h = getActive().getHeapSize();
264    for (Segment segment : this.pipeline.getSegments()) {
265      h += segment.getHeapSize();
266    }
267    return h;
268  }
269
270  @Override
271  public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) {
272    long minSequenceId = pipeline.getMinSequenceId();
273    if (minSequenceId != Long.MAX_VALUE) {
274      byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes();
275      byte[] familyName = getFamilyNameInBytes();
276      WAL WAL = getRegionServices().getWAL();
277      if (WAL != null) {
278        WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater);
279      }
280    }
281  }
282
283  /**
284   * This message intends to inform the MemStore that next coming updates are going to be part of
285   * the replaying edits from WAL
286   */
287  @Override
288  public void startReplayingFromWAL() {
289    inWalReplay = true;
290  }
291
292  /**
293   * This message intends to inform the MemStore that the replaying edits from WAL are done
294   */
295  @Override
296  public void stopReplayingFromWAL() {
297    inWalReplay = false;
298  }
299
300  /**
301   * Issue any synchronization and test needed before applying the update For compacting memstore
302   * this means checking the update can increase the size without overflow
303   * @param currentActive  the segment to be updated
304   * @param cell           the cell to be added
305   * @param memstoreSizing object to accumulate region size changes
306   * @return true iff can proceed with applying the update
307   */
308  @Override
309  protected boolean preUpdate(MutableSegment currentActive, Cell cell,
310    MemStoreSizing memstoreSizing) {
311    if (currentActive.sharedLock()) {
312      if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) {
313        return true;
314      }
315      currentActive.sharedUnlock();
316    }
317    return false;
318  }
319
320  @Override
321  protected void postUpdate(MutableSegment currentActive) {
322    currentActive.sharedUnlock();
323  }
324
325  @Override
326  protected boolean sizeAddedPreOperation() {
327    return true;
328  }
329
330  // the getSegments() method is used for tests only
331  @Override
332  protected List<Segment> getSegments() {
333    List<? extends Segment> pipelineList = pipeline.getSegments();
334    List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
335    list.add(getActive());
336    list.addAll(pipelineList);
337    list.addAll(snapshot.getAllSegments());
338
339    return list;
340  }
341
342  // the following three methods allow to manipulate the settings of composite snapshot
343  public void setCompositeSnapshot(boolean useCompositeSnapshot) {
344    this.compositeSnapshot = useCompositeSnapshot;
345  }
346
347  public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
348    boolean merge) {
349    // last true stands for updating the region size
350    return pipeline.swap(versionedList, result, !merge, true);
351  }
352
353  /**
354   * @param requesterVersion The caller must hold the VersionedList of the pipeline with version
355   *                         taken earlier. This version must be passed as a parameter here. The
356   *                         flattening happens only if versions match.
357   */
358  public void flattenOneSegment(long requesterVersion, MemStoreCompactionStrategy.Action action) {
359    pipeline.flattenOneSegment(requesterVersion, indexType, action);
360  }
361
362  // setter is used only for testability
363  void setIndexType(IndexType type) {
364    indexType = type;
365    // Because this functionality is for testing only and tests are setting in-memory flush size
366    // according to their need, there is no setting of in-memory flush size, here.
367    // If it is needed, please change in-memory flush size explicitly
368  }
369
370  public IndexType getIndexType() {
371    return indexType;
372  }
373
374  public boolean hasImmutableSegments() {
375    return !pipeline.isEmpty();
376  }
377
378  public VersionedSegmentsList getImmutableSegments() {
379    return pipeline.getVersionedList();
380  }
381
382  public long getSmallestReadPoint() {
383    return store.getSmallestReadPoint();
384  }
385
386  public HStore getStore() {
387    return store;
388  }
389
390  public String getFamilyName() {
391    return Bytes.toString(getFamilyNameInBytes());
392  }
393
394  /**
395   * This method is protected under {@link HStore#lock} read lock.
396   */
397  @Override
398  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
399    MutableSegment activeTmp = getActive();
400    List<? extends Segment> pipelineList = pipeline.getSegments();
401    List<? extends Segment> snapshotList = snapshot.getAllSegments();
402    long numberOfSegments = 1L + pipelineList.size() + snapshotList.size();
403    // The list of elements in pipeline + the active element + the snapshot segment
404    List<KeyValueScanner> list = createList((int) numberOfSegments);
405    addToScanners(activeTmp, readPt, list);
406    addToScanners(pipelineList, readPt, list);
407    addToScanners(snapshotList, readPt, list);
408    return list;
409  }
410
411  protected List<KeyValueScanner> createList(int capacity) {
412    return new ArrayList<>(capacity);
413  }
414
415  /**
416   * Check whether anything need to be done based on the current active set size. The method is
417   * invoked upon every addition to the active set. For CompactingMemStore, flush the active set to
418   * the read-only memory if it's size is above threshold
419   * @param currActive     intended segment to update
420   * @param cellToAdd      cell to be added to the segment
421   * @param memstoreSizing object to accumulate changed size
422   * @return true if the cell can be added to the currActive
423   */
424  protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
425    MemStoreSizing memstoreSizing) {
426    long cellSize = MutableSegment.getCellLength(cellToAdd);
427    boolean successAdd = false;
428    while (true) {
429      long segmentDataSize = currActive.getDataSize();
430      if (!inWalReplay && segmentDataSize > inmemoryFlushSize) {
431        // when replaying edits from WAL there is no need in in-memory flush regardless the size
432        // otherwise size below flush threshold try to update atomically
433        break;
434      }
435      if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
436        if (memstoreSizing != null) {
437          memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
438        }
439        successAdd = true;
440        break;
441      }
442    }
443
444    if (!inWalReplay && currActive.getDataSize() > inmemoryFlushSize) {
445      // size above flush threshold so we flush in memory
446      this.tryFlushInMemoryAndCompactingAsync(currActive);
447    }
448    return successAdd;
449  }
450
451  /**
452   * Try to flush the currActive in memory and submit the background
453   * {@link InMemoryCompactionRunnable} to
454   * {@link RegionServicesForStores#getInMemoryCompactionPool()}. Just one thread can do the actual
455   * flushing in memory.
456   * @param currActive current Active Segment to be flush in memory.
457   */
458  private void tryFlushInMemoryAndCompactingAsync(MutableSegment currActive) {
459    if (currActive.setInMemoryFlushed()) {
460      flushInMemory(currActive);
461      if (setInMemoryCompactionFlag()) {
462        // The thread is dispatched to do in-memory compaction in the background
463        InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
464        if (LOG.isTraceEnabled()) {
465          LOG.trace(
466            "Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName());
467        }
468        getPool().execute(runnable);
469      }
470    }
471  }
472
473  // externally visible only for tests
474  // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
475  // otherwise there is a deadlock
476  void flushInMemory() {
477    MutableSegment currActive = getActive();
478    if (currActive.setInMemoryFlushed()) {
479      flushInMemory(currActive);
480    }
481    inMemoryCompaction();
482  }
483
484  protected void flushInMemory(MutableSegment currActive) {
485    LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
486    // NOTE: Due to concurrent writes and because we first add cell size to currActive.getDataSize
487    // and then actually add cell to currActive.cellSet, it is possible that
488    // currActive.getDataSize could not accommodate cellToAdd but currActive.cellSet is still
489    // empty if pending writes which not yet add cells to currActive.cellSet.
490    // so here we should not check currActive.isEmpty or not.
491    pushActiveToPipeline(currActive, false);
492  }
493
494  void inMemoryCompaction() {
495    // setting the inMemoryCompactionInProgress flag again for the case this method is invoked
496    // directly (only in tests) in the common path setting from true to true is idempotent
497    inMemoryCompactionInProgress.set(true);
498    // Used by tests
499    if (!allowCompaction.get()) {
500      return;
501    }
502    try {
503      // Speculative compaction execution, may be interrupted if flush is forced while
504      // compaction is in progress
505      if (!compactor.start()) {
506        setInMemoryCompactionCompleted();
507      }
508    } catch (IOException e) {
509      LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}",
510        getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e);
511    }
512  }
513
514  private Segment getLastSegment() {
515    Segment localActive = getActive();
516    Segment tail = pipeline.getTail();
517    return tail == null ? localActive : tail;
518  }
519
520  private byte[] getFamilyNameInBytes() {
521    return store.getColumnFamilyDescriptor().getName();
522  }
523
524  private ThreadPoolExecutor getPool() {
525    return getRegionServices().getInMemoryCompactionPool();
526  }
527
528  /**
529   * The request to cancel the compaction asynchronous task (caused by in-memory flush) The
530   * compaction may still happen if the request was sent too late Non-blocking request
531   */
532  private void stopCompaction() {
533    if (inMemoryCompactionInProgress.get()) {
534      compactor.stop();
535    }
536  }
537
538  /**
539   * NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to
540   * concurrent writes and because we first add cell size to currActive.getDataSize and then
541   * actually add cell to currActive.cellSet, it is possible that currActive.getDataSize could not
542   * accommodate cellToAdd but currActive.cellSet is still empty if pending writes which not yet add
543   * cells to currActive.cellSet,so for
544   * {@link CompactingMemStore#flushInMemory(MutableSegment)},checkEmpty parameter is false. But if
545   * {@link CompactingMemStore#snapshot} called this method,because there is no pending
546   * write,checkEmpty parameter could be true.
547   */
548  protected void pushActiveToPipeline(MutableSegment currActive, boolean checkEmpty) {
549    if (!checkEmpty || !currActive.isEmpty()) {
550      pipeline.pushHead(currActive);
551      resetActive();
552    }
553  }
554
555  private void pushTailToSnapshot() {
556    VersionedSegmentsList segments = pipeline.getVersionedTail();
557    pushToSnapshot(segments.getStoreSegments());
558    // In Swap: don't close segments (they are in snapshot now) and don't update the region size
559    pipeline.swap(segments, null, false, false);
560  }
561
562  private void pushPipelineToSnapshot() {
563    int iterationsCnt = 0;
564    boolean done = false;
565    while (!done) {
566      iterationsCnt++;
567      VersionedSegmentsList segments = getImmutableSegments();
568      pushToSnapshot(segments.getStoreSegments());
569      // swap can return false in case the pipeline was updated by ongoing compaction
570      // and the version increase, the chance of it happenning is very low
571      // In Swap: don't close segments (they are in snapshot now) and don't update the region size
572      done = swapPipelineWithNull(segments);
573      if (iterationsCnt > 2) {
574        // practically it is impossible that this loop iterates more than two times
575        // (because the compaction is stopped and none restarts it while in snapshot request),
576        // however stopping here for the case of the infinite loop causing by any error
577        LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot,"
578          + " while flushing to disk.");
579        this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator());
580        break;
581      }
582    }
583  }
584
585  protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
586    return pipeline.swap(segments, null, false, false);
587  }
588
589  private void pushToSnapshot(List<ImmutableSegment> segments) {
590    if (segments.isEmpty()) return;
591    if (segments.size() == 1 && !segments.get(0).isEmpty()) {
592      this.snapshot = segments.get(0);
593      return;
594    } else { // create composite snapshot
595      this.snapshot =
596        SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments);
597    }
598  }
599
600  private RegionServicesForStores getRegionServices() {
601    return regionServices;
602  }
603
604  /**
605   * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per
606   * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline,
607   * releases updatesLock and compacts the pipeline.
608   */
609  private class InMemoryCompactionRunnable implements Runnable {
610    @Override
611    public void run() {
612      inMemoryCompaction();
613    }
614  }
615
616  boolean isMemStoreFlushingInMemory() {
617    return inMemoryCompactionInProgress.get();
618  }
619
620  /**
621   * @param cell Find the row that comes after this one. If null, we return the first.
622   * @return Next row or null if none found.
623   */
624  Cell getNextRow(final Cell cell) {
625    Cell lowest = null;
626    List<Segment> segments = getSegments();
627    for (Segment segment : segments) {
628      if (lowest == null) {
629        lowest = getNextRow(cell, segment.getCellSet());
630      } else {
631        lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
632      }
633    }
634    return lowest;
635  }
636
637  long getInmemoryFlushSize() {
638    return inmemoryFlushSize;
639  }
640
641  // debug method
642  public void debug() {
643    String msg = "active size=" + getActive().getDataSize();
644    msg += " allow compaction is " + (allowCompaction.get() ? "true" : "false");
645    msg +=
646      " inMemoryCompactionInProgress is " + (inMemoryCompactionInProgress.get() ? "true" : "false");
647    LOG.debug(msg);
648  }
649
650}