001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.concurrent.atomic.AtomicBoolean; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.ExtendedCell; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.MemoryCompactionPolicy; 031import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.ClassSize; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.hadoop.hbase.wal.WAL; 036import org.apache.hadoop.util.StringUtils; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * A memstore implementation which supports in-memory compaction. A compaction pipeline is added 043 * between the active set and the snapshot data structures; it consists of a list of segments that 044 * are subject to compaction. Like the snapshot, all pipeline segments are read-only; updates only 045 * affect the active set. To ensure this property we take advantage of the existing blocking 046 * mechanism -- the active set is pushed to the pipeline while holding the region's updatesLock in 047 * exclusive mode. Periodically, a compaction is applied in the background to all pipeline segments 048 * resulting in a single read-only component. The ``old'' segments are discarded when no scanner is 049 * reading them. 050 */ 051@InterfaceAudience.Private 052public class CompactingMemStore extends AbstractMemStore { 053 054 // The external setting of the compacting MemStore behaviour 055 public static final String COMPACTING_MEMSTORE_TYPE_KEY = 056 "hbase.hregion.compacting.memstore.type"; 057 public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = 058 String.valueOf(MemoryCompactionPolicy.NONE); 059 // Default fraction of in-memory-flush size w.r.t. flush-to-disk size 060 public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = 061 "hbase.memstore.inmemoryflush.threshold.factor"; 062 private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1; 063 // In-Memory compaction pool size 064 public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY = 065 "hbase.regionserver.inmemory.compaction.pool.size"; 066 public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10; 067 068 private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class); 069 private HStore store; 070 private CompactionPipeline pipeline; 071 protected MemStoreCompactor compactor; 072 073 private long inmemoryFlushSize; // the threshold on active size for in-memory flush 074 private final AtomicBoolean inMemoryCompactionInProgress = new AtomicBoolean(false); 075 076 // inWalReplay is true while we are synchronously replaying the edits from WAL 077 private boolean inWalReplay = false; 078 079 protected final AtomicBoolean allowCompaction = new AtomicBoolean(true); 080 private boolean compositeSnapshot = true; 081 082 /** 083 * Types of indexes (part of immutable segments) to be used after flattening, compaction, or merge 084 * are applied. 085 */ 086 public enum IndexType { 087 CSLM_MAP, // ConcurrentSkipLisMap 088 ARRAY_MAP, // CellArrayMap 089 CHUNK_MAP // CellChunkMap 090 } 091 092 private IndexType indexType = IndexType.ARRAY_MAP; // default implementation 093 094 public static final long DEEP_OVERHEAD = 095 ClassSize.align(AbstractMemStore.DEEP_OVERHEAD + 6 * ClassSize.REFERENCE // Store, 096 // CompactionPipeline, 097 // MemStoreCompactor, inMemoryCompactionInProgress, 098 // allowCompaction, indexType 099 + Bytes.SIZEOF_LONG // inmemoryFlushSize 100 + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay 101 + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and allowCompaction 102 + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD); 103 104 public CompactingMemStore(Configuration conf, CellComparator c, HStore store, 105 RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy) 106 throws IOException { 107 super(conf, c, regionServices); 108 this.store = store; 109 this.regionServices = regionServices; 110 this.pipeline = new CompactionPipeline(getRegionServices()); 111 this.compactor = createMemStoreCompactor(compactionPolicy); 112 if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) { 113 // if user requested to work with MSLABs (whether on- or off-heap), then the 114 // immutable segments are going to use CellChunkMap as their index 115 indexType = IndexType.CHUNK_MAP; 116 } else { 117 indexType = IndexType.ARRAY_MAP; 118 } 119 // initialization of the flush size should happen after initialization of the index type 120 // so do not transfer the following method 121 initInmemoryFlushSize(conf); 122 LOG.info( 123 "Store={}, in-memory flush size threshold={}, immutable segments index type={}, " 124 + "compactor={}", 125 this.store.getColumnFamilyName(), StringUtils.byteDesc(this.inmemoryFlushSize), 126 this.indexType, (this.compactor == null ? "NULL" : this.compactor.toString())); 127 } 128 129 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 130 throws IllegalArgumentIOException { 131 return new MemStoreCompactor(this, compactionPolicy); 132 } 133 134 private void initInmemoryFlushSize(Configuration conf) { 135 double factor = 0; 136 long memstoreFlushSize = getRegionServices().getMemStoreFlushSize(); 137 int numStores = getRegionServices().getNumStores(); 138 if (numStores <= 1) { 139 // Family number might also be zero in some of our unit test case 140 numStores = 1; 141 } 142 factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0); 143 if (factor != 0.0) { 144 // multiply by a factor (the same factor for all index types) 145 inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores; 146 } else { 147 inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER 148 * conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); 149 inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER; 150 } 151 } 152 153 /** 154 * @return Total memory occupied by this MemStore. This won't include any size occupied by the 155 * snapshot. We assume the snapshot will get cleared soon. This is not thread safe and the 156 * memstore may be changed while computing its size. It is the responsibility of the 157 * caller to make sure this doesn't happen. 158 */ 159 @Override 160 public MemStoreSize size() { 161 MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing(); 162 memstoreSizing.incMemStoreSize(getActive().getMemStoreSize()); 163 for (Segment item : pipeline.getSegments()) { 164 memstoreSizing.incMemStoreSize(item.getMemStoreSize()); 165 } 166 return memstoreSizing.getMemStoreSize(); 167 } 168 169 /** 170 * This method is called before the flush is executed. 171 * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush is 172 * executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}. 173 */ 174 @Override 175 public long preFlushSeqIDEstimation() { 176 if (compositeSnapshot) { 177 return HConstants.NO_SEQNUM; 178 } 179 Segment segment = getLastSegment(); 180 if (segment == null) { 181 return HConstants.NO_SEQNUM; 182 } 183 return segment.getMinSequenceId(); 184 } 185 186 @Override 187 public boolean isSloppy() { 188 return true; 189 } 190 191 /** 192 * Push the current active memstore segment into the pipeline and create a snapshot of the tail of 193 * current compaction pipeline Snapshot must be cleared by call to {@link #clearSnapshot}. 194 * {@link #clearSnapshot(long)}. 195 * @return {@link MemStoreSnapshot} 196 */ 197 @Override 198 public MemStoreSnapshot snapshot() { 199 // If snapshot currently has entries, then flusher failed or didn't call 200 // cleanup. Log a warning. 201 if (!this.snapshot.isEmpty()) { 202 LOG.warn("Snapshot called again without clearing previous. " 203 + "Doing nothing. Another ongoing flush or did we fail last attempt?"); 204 } else { 205 LOG.debug("FLUSHING TO DISK {}, store={}", 206 getRegionServices().getRegionInfo().getEncodedName(), getFamilyName()); 207 stopCompaction(); 208 // region level lock ensures pushing active to pipeline is done in isolation 209 // no concurrent update operations trying to flush the active segment 210 pushActiveToPipeline(getActive(), true); 211 resetTimeOfOldestEdit(); 212 snapshotId = EnvironmentEdgeManager.currentTime(); 213 // in both cases whatever is pushed to snapshot is cleared from the pipeline 214 if (compositeSnapshot) { 215 pushPipelineToSnapshot(); 216 } else { 217 pushTailToSnapshot(); 218 } 219 compactor.resetStats(); 220 } 221 return new MemStoreSnapshot(snapshotId, this.snapshot); 222 } 223 224 @Override 225 public MemStoreSize getFlushableSize() { 226 MemStoreSize mss = getSnapshotSize(); 227 if (mss.getDataSize() == 0) { 228 // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed 229 if (compositeSnapshot) { 230 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize()); 231 MutableSegment currActive = getActive(); 232 if (!currActive.isEmpty()) { 233 memStoreSizing.incMemStoreSize(currActive.getMemStoreSize()); 234 } 235 mss = memStoreSizing.getMemStoreSize(); 236 } else { 237 mss = pipeline.getTailSize(); 238 } 239 } 240 return mss.getDataSize() > 0 ? mss : getActive().getMemStoreSize(); 241 } 242 243 public void setInMemoryCompactionCompleted() { 244 inMemoryCompactionInProgress.set(false); 245 } 246 247 protected boolean setInMemoryCompactionFlag() { 248 return inMemoryCompactionInProgress.compareAndSet(false, true); 249 } 250 251 @Override 252 protected long keySize() { 253 // Need to consider dataSize/keySize of all segments in pipeline and active 254 long keySize = getActive().getDataSize(); 255 for (Segment segment : this.pipeline.getSegments()) { 256 keySize += segment.getDataSize(); 257 } 258 return keySize; 259 } 260 261 @Override 262 protected long heapSize() { 263 // Need to consider heapOverhead of all segments in pipeline and active 264 long h = getActive().getHeapSize(); 265 for (Segment segment : this.pipeline.getSegments()) { 266 h += segment.getHeapSize(); 267 } 268 return h; 269 } 270 271 @Override 272 public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) { 273 long minSequenceId = pipeline.getMinSequenceId(); 274 if (minSequenceId != Long.MAX_VALUE) { 275 byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes(); 276 byte[] familyName = getFamilyNameInBytes(); 277 WAL WAL = getRegionServices().getWAL(); 278 if (WAL != null) { 279 WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater); 280 } 281 } 282 } 283 284 /** 285 * This message intends to inform the MemStore that next coming updates are going to be part of 286 * the replaying edits from WAL 287 */ 288 @Override 289 public void startReplayingFromWAL() { 290 inWalReplay = true; 291 } 292 293 /** 294 * This message intends to inform the MemStore that the replaying edits from WAL are done 295 */ 296 @Override 297 public void stopReplayingFromWAL() { 298 inWalReplay = false; 299 } 300 301 /** 302 * Issue any synchronization and test needed before applying the update For compacting memstore 303 * this means checking the update can increase the size without overflow 304 * @param currentActive the segment to be updated 305 * @param cell the cell to be added 306 * @param memstoreSizing object to accumulate region size changes 307 * @return true iff can proceed with applying the update 308 */ 309 @Override 310 protected boolean preUpdate(MutableSegment currentActive, ExtendedCell cell, 311 MemStoreSizing memstoreSizing) { 312 if (currentActive.sharedLock()) { 313 if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) { 314 return true; 315 } 316 currentActive.sharedUnlock(); 317 } 318 return false; 319 } 320 321 @Override 322 protected void postUpdate(MutableSegment currentActive) { 323 currentActive.sharedUnlock(); 324 } 325 326 @Override 327 protected boolean sizeAddedPreOperation() { 328 return true; 329 } 330 331 // the getSegments() method is used for tests only 332 @Override 333 protected List<Segment> getSegments() { 334 List<? extends Segment> pipelineList = pipeline.getSegments(); 335 List<Segment> list = new ArrayList<>(pipelineList.size() + 2); 336 list.add(getActive()); 337 list.addAll(pipelineList); 338 list.addAll(snapshot.getAllSegments()); 339 340 return list; 341 } 342 343 // the following three methods allow to manipulate the settings of composite snapshot 344 public void setCompositeSnapshot(boolean useCompositeSnapshot) { 345 this.compositeSnapshot = useCompositeSnapshot; 346 } 347 348 public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, 349 boolean merge) { 350 // last true stands for updating the region size 351 return pipeline.swap(versionedList, result, !merge, true); 352 } 353 354 /** 355 * @param requesterVersion The caller must hold the VersionedList of the pipeline with version 356 * taken earlier. This version must be passed as a parameter here. The 357 * flattening happens only if versions match. 358 */ 359 public void flattenOneSegment(long requesterVersion, MemStoreCompactionStrategy.Action action) { 360 pipeline.flattenOneSegment(requesterVersion, indexType, action); 361 } 362 363 // setter is used only for testability 364 void setIndexType(IndexType type) { 365 indexType = type; 366 // Because this functionality is for testing only and tests are setting in-memory flush size 367 // according to their need, there is no setting of in-memory flush size, here. 368 // If it is needed, please change in-memory flush size explicitly 369 } 370 371 public IndexType getIndexType() { 372 return indexType; 373 } 374 375 public boolean hasImmutableSegments() { 376 return !pipeline.isEmpty(); 377 } 378 379 public VersionedSegmentsList getImmutableSegments() { 380 return pipeline.getVersionedList(); 381 } 382 383 public long getSmallestReadPoint() { 384 return store.getSmallestReadPoint(); 385 } 386 387 public HStore getStore() { 388 return store; 389 } 390 391 public String getFamilyName() { 392 return Bytes.toString(getFamilyNameInBytes()); 393 } 394 395 /** 396 * This method is protected under {@link HStore#lock} read lock. 397 */ 398 @Override 399 public List<KeyValueScanner> getScanners(long readPt) throws IOException { 400 MutableSegment activeTmp = getActive(); 401 List<? extends Segment> pipelineList = pipeline.getSegments(); 402 List<? extends Segment> snapshotList = snapshot.getAllSegments(); 403 long numberOfSegments = 1L + pipelineList.size() + snapshotList.size(); 404 // The list of elements in pipeline + the active element + the snapshot segment 405 List<KeyValueScanner> list = createList((int) numberOfSegments); 406 addToScanners(activeTmp, readPt, list); 407 addToScanners(pipelineList, readPt, list); 408 addToScanners(snapshotList, readPt, list); 409 return list; 410 } 411 412 protected List<KeyValueScanner> createList(int capacity) { 413 return new ArrayList<>(capacity); 414 } 415 416 /** 417 * Check whether anything need to be done based on the current active set size. The method is 418 * invoked upon every addition to the active set. For CompactingMemStore, flush the active set to 419 * the read-only memory if it's size is above threshold 420 * @param currActive intended segment to update 421 * @param cellToAdd cell to be added to the segment 422 * @param memstoreSizing object to accumulate changed size 423 * @return true if the cell can be added to the currActive 424 */ 425 protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 426 MemStoreSizing memstoreSizing) { 427 long cellSize = MutableSegment.getCellLength(cellToAdd); 428 boolean successAdd = false; 429 while (true) { 430 long segmentDataSize = currActive.getDataSize(); 431 if (!inWalReplay && segmentDataSize > inmemoryFlushSize) { 432 // when replaying edits from WAL there is no need in in-memory flush regardless the size 433 // otherwise size below flush threshold try to update atomically 434 break; 435 } 436 if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) { 437 if (memstoreSizing != null) { 438 memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0); 439 } 440 successAdd = true; 441 break; 442 } 443 } 444 445 if (!inWalReplay && currActive.getDataSize() > inmemoryFlushSize) { 446 // size above flush threshold so we flush in memory 447 this.tryFlushInMemoryAndCompactingAsync(currActive); 448 } 449 return successAdd; 450 } 451 452 /** 453 * Try to flush the currActive in memory and submit the background 454 * {@link InMemoryCompactionRunnable} to 455 * {@link RegionServicesForStores#getInMemoryCompactionPool()}. Just one thread can do the actual 456 * flushing in memory. 457 * @param currActive current Active Segment to be flush in memory. 458 */ 459 private void tryFlushInMemoryAndCompactingAsync(MutableSegment currActive) { 460 if (currActive.setInMemoryFlushed()) { 461 flushInMemory(currActive); 462 if (setInMemoryCompactionFlag()) { 463 // The thread is dispatched to do in-memory compaction in the background 464 InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable(); 465 if (LOG.isTraceEnabled()) { 466 LOG.trace( 467 "Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName()); 468 } 469 getPool().execute(runnable); 470 } 471 } 472 } 473 474 // externally visible only for tests 475 // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock, 476 // otherwise there is a deadlock 477 void flushInMemory() { 478 MutableSegment currActive = getActive(); 479 if (currActive.setInMemoryFlushed()) { 480 flushInMemory(currActive); 481 } 482 inMemoryCompaction(); 483 } 484 485 protected void flushInMemory(MutableSegment currActive) { 486 LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline"); 487 // NOTE: Due to concurrent writes and because we first add cell size to currActive.getDataSize 488 // and then actually add cell to currActive.cellSet, it is possible that 489 // currActive.getDataSize could not accommodate cellToAdd but currActive.cellSet is still 490 // empty if pending writes which not yet add cells to currActive.cellSet. 491 // so here we should not check currActive.isEmpty or not. 492 pushActiveToPipeline(currActive, false); 493 } 494 495 void inMemoryCompaction() { 496 // setting the inMemoryCompactionInProgress flag again for the case this method is invoked 497 // directly (only in tests) in the common path setting from true to true is idempotent 498 inMemoryCompactionInProgress.set(true); 499 // Used by tests 500 if (!allowCompaction.get()) { 501 return; 502 } 503 try { 504 // Speculative compaction execution, may be interrupted if flush is forced while 505 // compaction is in progress 506 if (!compactor.start()) { 507 setInMemoryCompactionCompleted(); 508 } 509 } catch (IOException e) { 510 LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}", 511 getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e); 512 } 513 } 514 515 private Segment getLastSegment() { 516 Segment localActive = getActive(); 517 Segment tail = pipeline.getTail(); 518 return tail == null ? localActive : tail; 519 } 520 521 private byte[] getFamilyNameInBytes() { 522 return store.getColumnFamilyDescriptor().getName(); 523 } 524 525 private ThreadPoolExecutor getPool() { 526 return getRegionServices().getInMemoryCompactionPool(); 527 } 528 529 /** 530 * The request to cancel the compaction asynchronous task (caused by in-memory flush) The 531 * compaction may still happen if the request was sent too late Non-blocking request 532 */ 533 private void stopCompaction() { 534 if (inMemoryCompactionInProgress.get()) { 535 compactor.stop(); 536 } 537 } 538 539 /** 540 * NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to 541 * concurrent writes and because we first add cell size to currActive.getDataSize and then 542 * actually add cell to currActive.cellSet, it is possible that currActive.getDataSize could not 543 * accommodate cellToAdd but currActive.cellSet is still empty if pending writes which not yet add 544 * cells to currActive.cellSet,so for 545 * {@link CompactingMemStore#flushInMemory(MutableSegment)},checkEmpty parameter is false. But if 546 * {@link CompactingMemStore#snapshot} called this method,because there is no pending 547 * write,checkEmpty parameter could be true. 548 */ 549 protected void pushActiveToPipeline(MutableSegment currActive, boolean checkEmpty) { 550 if (!checkEmpty || !currActive.isEmpty()) { 551 pipeline.pushHead(currActive); 552 resetActive(); 553 } 554 } 555 556 private void pushTailToSnapshot() { 557 VersionedSegmentsList segments = pipeline.getVersionedTail(); 558 pushToSnapshot(segments.getStoreSegments()); 559 // In Swap: don't close segments (they are in snapshot now) and don't update the region size 560 pipeline.swap(segments, null, false, false); 561 } 562 563 private void pushPipelineToSnapshot() { 564 int iterationsCnt = 0; 565 boolean done = false; 566 while (!done) { 567 iterationsCnt++; 568 VersionedSegmentsList segments = getImmutableSegments(); 569 pushToSnapshot(segments.getStoreSegments()); 570 // swap can return false in case the pipeline was updated by ongoing compaction 571 // and the version increase, the chance of it happenning is very low 572 // In Swap: don't close segments (they are in snapshot now) and don't update the region size 573 done = swapPipelineWithNull(segments); 574 if (iterationsCnt > 2) { 575 // practically it is impossible that this loop iterates more than two times 576 // (because the compaction is stopped and none restarts it while in snapshot request), 577 // however stopping here for the case of the infinite loop causing by any error 578 LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," 579 + " while flushing to disk."); 580 this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator()); 581 break; 582 } 583 } 584 } 585 586 protected boolean swapPipelineWithNull(VersionedSegmentsList segments) { 587 return pipeline.swap(segments, null, false, false); 588 } 589 590 private void pushToSnapshot(List<ImmutableSegment> segments) { 591 if (segments.isEmpty()) return; 592 if (segments.size() == 1 && !segments.get(0).isEmpty()) { 593 this.snapshot = segments.get(0); 594 return; 595 } else { // create composite snapshot 596 this.snapshot = 597 SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments); 598 } 599 } 600 601 private RegionServicesForStores getRegionServices() { 602 return regionServices; 603 } 604 605 /** 606 * The in-memory-flusher thread performs the flush asynchronously. There is at most one thread per 607 * memstore instance. It takes the updatesLock exclusively, pushes active into the pipeline, 608 * releases updatesLock and compacts the pipeline. 609 */ 610 private class InMemoryCompactionRunnable implements Runnable { 611 @Override 612 public void run() { 613 inMemoryCompaction(); 614 } 615 } 616 617 boolean isMemStoreFlushingInMemory() { 618 return inMemoryCompactionInProgress.get(); 619 } 620 621 /** 622 * @param cell Find the row that comes after this one. If null, we return the first. 623 * @return Next row or null if none found. 624 */ 625 ExtendedCell getNextRow(final ExtendedCell cell) { 626 ExtendedCell lowest = null; 627 List<Segment> segments = getSegments(); 628 for (Segment segment : segments) { 629 if (lowest == null) { 630 lowest = getNextRow(cell, segment.getCellSet()); 631 } else { 632 lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet())); 633 } 634 } 635 return lowest; 636 } 637 638 long getInmemoryFlushSize() { 639 return inmemoryFlushSize; 640 } 641 642 // debug method 643 public void debug() { 644 String msg = "active size=" + getActive().getDataSize(); 645 msg += " allow compaction is " + (allowCompaction.get() ? "true" : "false"); 646 msg += 647 " inMemoryCompactionInProgress is " + (inMemoryCompactionInProgress.get() ? "true" : "false"); 648 LOG.debug(msg); 649 } 650 651}