001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.atomic.AtomicBoolean;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.MemoryCompactionPolicy;
032import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.ClassSize;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.wal.WAL;
037import org.apache.hadoop.util.StringUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * A memstore implementation which supports in-memory compaction.
044 * A compaction pipeline is added between the active set and the snapshot data structures;
045 * it consists of a list of segments that are subject to compaction.
046 * Like the snapshot, all pipeline segments are read-only; updates only affect the active set.
047 * To ensure this property we take advantage of the existing blocking mechanism -- the active set
048 * is pushed to the pipeline while holding the region's updatesLock in exclusive mode.
049 * Periodically, a compaction is applied in the background to all pipeline segments resulting
050 * in a single read-only component. The ``old'' segments are discarded when no scanner is reading
051 * them.
052 */
053@InterfaceAudience.Private
054public class CompactingMemStore extends AbstractMemStore {
055
056  // The external setting of the compacting MemStore behaviour
057  public static final String COMPACTING_MEMSTORE_TYPE_KEY =
058      "hbase.hregion.compacting.memstore.type";
059  public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT =
060      String.valueOf(MemoryCompactionPolicy.NONE);
061  // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
062  public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
063      "hbase.memstore.inmemoryflush.threshold.factor";
064  private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1;
065  // In-Memory compaction pool size
066  public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY =
067      "hbase.regionserver.inmemory.compaction.pool.size";
068  public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10;
069
070  private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
071  private HStore store;
072  private CompactionPipeline pipeline;
073  protected MemStoreCompactor compactor;
074
075  private long inmemoryFlushSize;       // the threshold on active size for in-memory flush
076  private final AtomicBoolean inMemoryCompactionInProgress = new AtomicBoolean(false);
077
078  // inWalReplay is true while we are synchronously replaying the edits from WAL
079  private boolean inWalReplay = false;
080
081  protected final AtomicBoolean allowCompaction = new AtomicBoolean(true);
082  private boolean compositeSnapshot = true;
083
084  /**
085   * Types of indexes (part of immutable segments) to be used after flattening,
086   * compaction, or merge are applied.
087   */
088  public enum IndexType {
089    CSLM_MAP,   // ConcurrentSkipLisMap
090    ARRAY_MAP,  // CellArrayMap
091    CHUNK_MAP   // CellChunkMap
092  }
093
094  private IndexType indexType = IndexType.ARRAY_MAP;  // default implementation
095
096  public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
097      + 6 * ClassSize.REFERENCE     // Store, CompactionPipeline,
098      // MemStoreCompactor, inMemoryCompactionInProgress,
099      // allowCompaction, indexType
100      + Bytes.SIZEOF_LONG           // inmemoryFlushSize
101      + 2 * Bytes.SIZEOF_BOOLEAN    // compositeSnapshot and inWalReplay
102      + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and allowCompaction
103      + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD);
104
105  public CompactingMemStore(Configuration conf, CellComparator c,
106      HStore store, RegionServicesForStores regionServices,
107      MemoryCompactionPolicy compactionPolicy) throws IOException {
108    super(conf, c, regionServices);
109    this.store = store;
110    this.regionServices = regionServices;
111    this.pipeline = new CompactionPipeline(getRegionServices());
112    this.compactor = createMemStoreCompactor(compactionPolicy);
113    if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
114      // if user requested to work with MSLABs (whether on- or off-heap), then the
115      // immutable segments are going to use CellChunkMap as their index
116      indexType = IndexType.CHUNK_MAP;
117    } else {
118      indexType = IndexType.ARRAY_MAP;
119    }
120    // initialization of the flush size should happen after initialization of the index type
121    // so do not transfer the following method
122    initInmemoryFlushSize(conf);
123    LOG.info("Store={}, in-memory flush size threshold={}, immutable segments index type={}, " +
124            "compactor={}", this.store.getColumnFamilyName(),
125        StringUtils.byteDesc(this.inmemoryFlushSize), this.indexType,
126        (this.compactor == null? "NULL": this.compactor.toString()));
127  }
128
129  protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
130      throws IllegalArgumentIOException {
131    return new MemStoreCompactor(this, compactionPolicy);
132  }
133
134  private void initInmemoryFlushSize(Configuration conf) {
135    double factor = 0;
136    long memstoreFlushSize = getRegionServices().getMemStoreFlushSize();
137    int numStores = getRegionServices().getNumStores();
138    if (numStores <= 1) {
139      // Family number might also be zero in some of our unit test case
140      numStores = 1;
141    }
142    factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0);
143    if(factor != 0.0) {
144      // multiply by a factor (the same factor for all index types)
145      inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores;
146    } else {
147      inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER *
148          conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
149      inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER;
150    }
151  }
152
153  /**
154   * @return Total memory occupied by this MemStore. This won't include any size occupied by the
155   *         snapshot. We assume the snapshot will get cleared soon. This is not thread safe and
156   *         the memstore may be changed while computing its size. It is the responsibility of the
157   *         caller to make sure this doesn't happen.
158   */
159  @Override
160  public MemStoreSize size() {
161    MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
162    memstoreSizing.incMemStoreSize(getActive().getMemStoreSize());
163    for (Segment item : pipeline.getSegments()) {
164      memstoreSizing.incMemStoreSize(item.getMemStoreSize());
165    }
166    return memstoreSizing.getMemStoreSize();
167  }
168
169  /**
170   * This method is called before the flush is executed.
171   * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush
172   * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
173   */
174  @Override
175  public long preFlushSeqIDEstimation() {
176    if(compositeSnapshot) {
177      return HConstants.NO_SEQNUM;
178    }
179    Segment segment = getLastSegment();
180    if(segment == null) {
181      return HConstants.NO_SEQNUM;
182    }
183    return segment.getMinSequenceId();
184  }
185
186  @Override
187  public boolean isSloppy() {
188    return true;
189  }
190
191  /**
192   * Push the current active memstore segment into the pipeline
193   * and create a snapshot of the tail of current compaction pipeline
194   * Snapshot must be cleared by call to {@link #clearSnapshot}.
195   * {@link #clearSnapshot(long)}.
196   * @return {@link MemStoreSnapshot}
197   */
198  @Override
199  public MemStoreSnapshot snapshot() {
200    // If snapshot currently has entries, then flusher failed or didn't call
201    // cleanup.  Log a warning.
202    if (!this.snapshot.isEmpty()) {
203      LOG.warn("Snapshot called again without clearing previous. " +
204          "Doing nothing. Another ongoing flush or did we fail last attempt?");
205    } else {
206      LOG.debug("FLUSHING TO DISK {}, store={}",
207          getRegionServices().getRegionInfo().getEncodedName(), getFamilyName());
208      stopCompaction();
209      // region level lock ensures pushing active to pipeline is done in isolation
210      // no concurrent update operations trying to flush the active segment
211      pushActiveToPipeline(getActive());
212      resetTimeOfOldestEdit();
213      snapshotId = EnvironmentEdgeManager.currentTime();
214      // in both cases whatever is pushed to snapshot is cleared from the pipeline
215      if (compositeSnapshot) {
216        pushPipelineToSnapshot();
217      } else {
218        pushTailToSnapshot();
219      }
220      compactor.resetStats();
221    }
222    return new MemStoreSnapshot(snapshotId, this.snapshot);
223  }
224
225  @Override
226  public MemStoreSize getFlushableSize() {
227    MemStoreSize mss = getSnapshotSize();
228    if (mss.getDataSize() == 0) {
229      // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
230      if (compositeSnapshot) {
231        MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
232        MutableSegment currActive = getActive();
233        if(!currActive.isEmpty()) {
234          memStoreSizing.incMemStoreSize(currActive.getMemStoreSize());
235        }
236        mss = memStoreSizing.getMemStoreSize();
237      } else {
238        mss = pipeline.getTailSize();
239      }
240    }
241    return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
242  }
243
244
245  public void setInMemoryCompactionCompleted() {
246    inMemoryCompactionInProgress.set(false);
247  }
248
249  protected boolean setInMemoryCompactionFlag() {
250    return inMemoryCompactionInProgress.compareAndSet(false, true);
251  }
252
253  @Override
254  protected long keySize() {
255    // Need to consider dataSize/keySize of all segments in pipeline and active
256    long keySize = getActive().getDataSize();
257    for (Segment segment : this.pipeline.getSegments()) {
258      keySize += segment.getDataSize();
259    }
260    return keySize;
261  }
262
263  @Override
264  protected long heapSize() {
265    // Need to consider heapOverhead of all segments in pipeline and active
266    long h = getActive().getHeapSize();
267    for (Segment segment : this.pipeline.getSegments()) {
268      h += segment.getHeapSize();
269    }
270    return h;
271  }
272
273  @Override
274  public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) {
275    long minSequenceId = pipeline.getMinSequenceId();
276    if(minSequenceId != Long.MAX_VALUE) {
277      byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes();
278      byte[] familyName = getFamilyNameInBytes();
279      WAL WAL = getRegionServices().getWAL();
280      if (WAL != null) {
281        WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater);
282      }
283    }
284  }
285
286  /**
287   * This message intends to inform the MemStore that next coming updates
288   * are going to be part of the replaying edits from WAL
289   */
290  @Override
291  public void startReplayingFromWAL() {
292    inWalReplay = true;
293  }
294
295  /**
296   * This message intends to inform the MemStore that the replaying edits from WAL
297   * are done
298   */
299  @Override
300  public void stopReplayingFromWAL() {
301    inWalReplay = false;
302  }
303
304  /**
305   * Issue any synchronization and test needed before applying the update
306   * For compacting memstore this means checking the update can increase the size without
307   * overflow
308   * @param currentActive the segment to be updated
309   * @param cell the cell to be added
310   * @param memstoreSizing object to accumulate region size changes
311   * @return true iff can proceed with applying the update
312   */
313  @Override
314  protected boolean preUpdate(MutableSegment currentActive, Cell cell,
315      MemStoreSizing memstoreSizing) {
316    if (currentActive.sharedLock()) {
317      if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) {
318        return true;
319      }
320      currentActive.sharedUnlock();
321    }
322    return false;
323  }
324
325  @Override protected void postUpdate(MutableSegment currentActive) {
326    currentActive.sharedUnlock();
327  }
328
329  @Override protected boolean sizeAddedPreOperation() {
330    return true;
331  }
332
333  // the getSegments() method is used for tests only
334  @Override
335  protected List<Segment> getSegments() {
336    List<? extends Segment> pipelineList = pipeline.getSegments();
337    List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
338    list.add(getActive());
339    list.addAll(pipelineList);
340    list.addAll(snapshot.getAllSegments());
341
342    return list;
343  }
344
345  // the following three methods allow to manipulate the settings of composite snapshot
346  public void setCompositeSnapshot(boolean useCompositeSnapshot) {
347    this.compositeSnapshot = useCompositeSnapshot;
348  }
349
350  public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
351      boolean merge) {
352    // last true stands for updating the region size
353    return pipeline.swap(versionedList, result, !merge, true);
354  }
355
356  /**
357   * @param requesterVersion The caller must hold the VersionedList of the pipeline
358   *           with version taken earlier. This version must be passed as a parameter here.
359   *           The flattening happens only if versions match.
360   */
361  public void flattenOneSegment(long requesterVersion,  MemStoreCompactionStrategy.Action action) {
362    pipeline.flattenOneSegment(requesterVersion, indexType, action);
363  }
364
365  // setter is used only for testability
366  void setIndexType(IndexType type) {
367    indexType = type;
368    // Because this functionality is for testing only and tests are setting in-memory flush size
369    // according to their need, there is no setting of in-memory flush size, here.
370    // If it is needed, please change in-memory flush size explicitly
371  }
372
373  public IndexType getIndexType() {
374    return indexType;
375  }
376
377  public boolean hasImmutableSegments() {
378    return !pipeline.isEmpty();
379  }
380
381  public VersionedSegmentsList getImmutableSegments() {
382    return pipeline.getVersionedList();
383  }
384
385  public long getSmallestReadPoint() {
386    return store.getSmallestReadPoint();
387  }
388
389  public HStore getStore() {
390    return store;
391  }
392
393  public String getFamilyName() {
394    return Bytes.toString(getFamilyNameInBytes());
395  }
396
397  @Override
398  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
399    MutableSegment activeTmp = getActive();
400    List<? extends Segment> pipelineList = pipeline.getSegments();
401    List<? extends Segment> snapshotList = snapshot.getAllSegments();
402    long numberOfSegments = 1L + pipelineList.size() + snapshotList.size();
403    // The list of elements in pipeline + the active element + the snapshot segment
404    List<KeyValueScanner> list = createList((int) numberOfSegments);
405    addToScanners(activeTmp, readPt, list);
406    addToScanners(pipelineList, readPt, list);
407    addToScanners(snapshotList, readPt, list);
408    return list;
409  }
410
411  protected List<KeyValueScanner> createList(int capacity) {
412    return new ArrayList<>(capacity);
413  }
414
415  /**
416   * Check whether anything need to be done based on the current active set size.
417   * The method is invoked upon every addition to the active set.
418   * For CompactingMemStore, flush the active set to the read-only memory if it's
419   * size is above threshold
420   * @param currActive intended segment to update
421   * @param cellToAdd cell to be added to the segment
422   * @param memstoreSizing object to accumulate changed size
423   * @return true if the cell can be added to the
424   */
425  private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
426      MemStoreSizing memstoreSizing) {
427    if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) {
428      if (currActive.setInMemoryFlushed()) {
429        flushInMemory(currActive);
430        if (setInMemoryCompactionFlag()) {
431          // The thread is dispatched to do in-memory compaction in the background
432          InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
433          if (LOG.isTraceEnabled()) {
434            LOG.trace("Dispatching the MemStore in-memory flush for store " + store
435                .getColumnFamilyName());
436          }
437          getPool().execute(runnable);
438        }
439      }
440      return false;
441    }
442    return true;
443  }
444
445  // externally visible only for tests
446  // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
447  // otherwise there is a deadlock
448  void flushInMemory() {
449    MutableSegment currActive = getActive();
450    if(currActive.setInMemoryFlushed()) {
451      flushInMemory(currActive);
452    }
453    inMemoryCompaction();
454  }
455
456  private void flushInMemory(MutableSegment currActive) {
457    LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
458    pushActiveToPipeline(currActive);
459  }
460
461  void inMemoryCompaction() {
462    // setting the inMemoryCompactionInProgress flag again for the case this method is invoked
463    // directly (only in tests) in the common path setting from true to true is idempotent
464    inMemoryCompactionInProgress.set(true);
465    // Used by tests
466    if (!allowCompaction.get()) {
467      return;
468    }
469    try {
470      // Speculative compaction execution, may be interrupted if flush is forced while
471      // compaction is in progress
472      if(!compactor.start()) {
473        setInMemoryCompactionCompleted();
474      }
475    } catch (IOException e) {
476      LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}",
477          getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e);
478    }
479  }
480
481  private Segment getLastSegment() {
482    Segment localActive = getActive();
483    Segment tail = pipeline.getTail();
484    return tail == null ? localActive : tail;
485  }
486
487  private byte[] getFamilyNameInBytes() {
488    return store.getColumnFamilyDescriptor().getName();
489  }
490
491  private ThreadPoolExecutor getPool() {
492    return getRegionServices().getInMemoryCompactionPool();
493  }
494
495  protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
496      MemStoreSizing memstoreSizing) {
497    long cellSize = MutableSegment.getCellLength(cellToAdd);
498    long segmentDataSize = currActive.getDataSize();
499    while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) {
500      // when replaying edits from WAL there is no need in in-memory flush regardless the size
501      // otherwise size below flush threshold try to update atomically
502      if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
503        if (memstoreSizing != null) {
504          memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
505        }
506        // enough space for cell - no need to flush
507        return false;
508      }
509      segmentDataSize = currActive.getDataSize();
510    }
511    // size above flush threshold
512    return true;
513  }
514
515  /**
516   * The request to cancel the compaction asynchronous task (caused by in-memory flush)
517   * The compaction may still happen if the request was sent too late
518   * Non-blocking request
519   */
520  private void stopCompaction() {
521    if (inMemoryCompactionInProgress.get()) {
522      compactor.stop();
523    }
524  }
525
526  protected void pushActiveToPipeline(MutableSegment currActive) {
527    if (!currActive.isEmpty()) {
528      pipeline.pushHead(currActive);
529      resetActive();
530    }
531  }
532
533  private void pushTailToSnapshot() {
534    VersionedSegmentsList segments = pipeline.getVersionedTail();
535    pushToSnapshot(segments.getStoreSegments());
536    // In Swap: don't close segments (they are in snapshot now) and don't update the region size
537    pipeline.swap(segments,null,false, false);
538  }
539
540  private void pushPipelineToSnapshot() {
541    int iterationsCnt = 0;
542    boolean done = false;
543    while (!done) {
544      iterationsCnt++;
545      VersionedSegmentsList segments = pipeline.getVersionedList();
546      pushToSnapshot(segments.getStoreSegments());
547      // swap can return false in case the pipeline was updated by ongoing compaction
548      // and the version increase, the chance of it happenning is very low
549      // In Swap: don't close segments (they are in snapshot now) and don't update the region size
550      done = pipeline.swap(segments, null, false, false);
551      if (iterationsCnt>2) {
552        // practically it is impossible that this loop iterates more than two times
553        // (because the compaction is stopped and none restarts it while in snapshot request),
554        // however stopping here for the case of the infinite loop causing by any error
555        LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," +
556            " while flushing to disk.");
557        this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator());
558        break;
559      }
560    }
561  }
562
563  private void pushToSnapshot(List<ImmutableSegment> segments) {
564    if(segments.isEmpty()) return;
565    if(segments.size() == 1 && !segments.get(0).isEmpty()) {
566      this.snapshot = segments.get(0);
567      return;
568    } else { // create composite snapshot
569      this.snapshot =
570          SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments);
571    }
572  }
573
574  private RegionServicesForStores getRegionServices() {
575    return regionServices;
576  }
577
578  /**
579   * The in-memory-flusher thread performs the flush asynchronously.
580   * There is at most one thread per memstore instance.
581   * It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock
582   * and compacts the pipeline.
583   */
584  private class InMemoryCompactionRunnable implements Runnable {
585    @Override
586    public void run() {
587      inMemoryCompaction();
588    }
589  }
590
591  boolean isMemStoreFlushingInMemory() {
592    return inMemoryCompactionInProgress.get();
593  }
594
595  /**
596   * @param cell Find the row that comes after this one.  If null, we return the
597   *             first.
598   * @return Next row or null if none found.
599   */
600  Cell getNextRow(final Cell cell) {
601    Cell lowest = null;
602    List<Segment> segments = getSegments();
603    for (Segment segment : segments) {
604      if (lowest == null) {
605        lowest = getNextRow(cell, segment.getCellSet());
606      } else {
607        lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
608      }
609    }
610    return lowest;
611  }
612
613  long getInmemoryFlushSize() {
614    return inmemoryFlushSize;
615  }
616
617  // debug method
618  public void debug() {
619    String msg = "active size=" + getActive().getDataSize();
620    msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
621    msg += " inMemoryCompactionInProgress is "+ (inMemoryCompactionInProgress.get() ? "true" :
622        "false");
623    LOG.debug(msg);
624  }
625
626}