001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.atomic.AtomicBoolean; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.MemoryCompactionPolicy; 032import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.ClassSize; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.wal.WAL; 037import org.apache.hadoop.util.StringUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * A memstore implementation which supports in-memory compaction. 044 * A compaction pipeline is added between the active set and the snapshot data structures; 045 * it consists of a list of segments that are subject to compaction. 046 * Like the snapshot, all pipeline segments are read-only; updates only affect the active set. 047 * To ensure this property we take advantage of the existing blocking mechanism -- the active set 048 * is pushed to the pipeline while holding the region's updatesLock in exclusive mode. 049 * Periodically, a compaction is applied in the background to all pipeline segments resulting 050 * in a single read-only component. The ``old'' segments are discarded when no scanner is reading 051 * them. 052 */ 053@InterfaceAudience.Private 054public class CompactingMemStore extends AbstractMemStore { 055 056 // The external setting of the compacting MemStore behaviour 057 public static final String COMPACTING_MEMSTORE_TYPE_KEY = 058 "hbase.hregion.compacting.memstore.type"; 059 public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = 060 String.valueOf(MemoryCompactionPolicy.NONE); 061 // Default fraction of in-memory-flush size w.r.t. flush-to-disk size 062 public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = 063 "hbase.memstore.inmemoryflush.threshold.factor"; 064 private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1; 065 // In-Memory compaction pool size 066 public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY = 067 "hbase.regionserver.inmemory.compaction.pool.size"; 068 public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10; 069 070 private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class); 071 private HStore store; 072 private CompactionPipeline pipeline; 073 protected MemStoreCompactor compactor; 074 075 private long inmemoryFlushSize; // the threshold on active size for in-memory flush 076 private final AtomicBoolean inMemoryCompactionInProgress = new AtomicBoolean(false); 077 078 // inWalReplay is true while we are synchronously replaying the edits from WAL 079 private boolean inWalReplay = false; 080 081 protected final AtomicBoolean allowCompaction = new AtomicBoolean(true); 082 private boolean compositeSnapshot = true; 083 084 /** 085 * Types of indexes (part of immutable segments) to be used after flattening, 086 * compaction, or merge are applied. 087 */ 088 public enum IndexType { 089 CSLM_MAP, // ConcurrentSkipLisMap 090 ARRAY_MAP, // CellArrayMap 091 CHUNK_MAP // CellChunkMap 092 } 093 094 private IndexType indexType = IndexType.ARRAY_MAP; // default implementation 095 096 public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD 097 + 6 * ClassSize.REFERENCE // Store, CompactionPipeline, 098 // MemStoreCompactor, inMemoryCompactionInProgress, 099 // allowCompaction, indexType 100 + Bytes.SIZEOF_LONG // inmemoryFlushSize 101 + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay 102 + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and allowCompaction 103 + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD); 104 105 public CompactingMemStore(Configuration conf, CellComparator c, 106 HStore store, RegionServicesForStores regionServices, 107 MemoryCompactionPolicy compactionPolicy) throws IOException { 108 super(conf, c, regionServices); 109 this.store = store; 110 this.regionServices = regionServices; 111 this.pipeline = new CompactionPipeline(getRegionServices()); 112 this.compactor = createMemStoreCompactor(compactionPolicy); 113 if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) { 114 // if user requested to work with MSLABs (whether on- or off-heap), then the 115 // immutable segments are going to use CellChunkMap as their index 116 indexType = IndexType.CHUNK_MAP; 117 } else { 118 indexType = IndexType.ARRAY_MAP; 119 } 120 // initialization of the flush size should happen after initialization of the index type 121 // so do not transfer the following method 122 initInmemoryFlushSize(conf); 123 LOG.info("Store={}, in-memory flush size threshold={}, immutable segments index type={}, " + 124 "compactor={}", this.store.getColumnFamilyName(), 125 StringUtils.byteDesc(this.inmemoryFlushSize), this.indexType, 126 (this.compactor == null? "NULL": this.compactor.toString())); 127 } 128 129 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 130 throws IllegalArgumentIOException { 131 return new MemStoreCompactor(this, compactionPolicy); 132 } 133 134 private void initInmemoryFlushSize(Configuration conf) { 135 double factor = 0; 136 long memstoreFlushSize = getRegionServices().getMemStoreFlushSize(); 137 int numStores = getRegionServices().getNumStores(); 138 if (numStores <= 1) { 139 // Family number might also be zero in some of our unit test case 140 numStores = 1; 141 } 142 factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0); 143 if(factor != 0.0) { 144 // multiply by a factor (the same factor for all index types) 145 inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores; 146 } else { 147 inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER * 148 conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); 149 inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER; 150 } 151 } 152 153 /** 154 * @return Total memory occupied by this MemStore. This won't include any size occupied by the 155 * snapshot. We assume the snapshot will get cleared soon. This is not thread safe and 156 * the memstore may be changed while computing its size. It is the responsibility of the 157 * caller to make sure this doesn't happen. 158 */ 159 @Override 160 public MemStoreSize size() { 161 MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing(); 162 memstoreSizing.incMemStoreSize(getActive().getMemStoreSize()); 163 for (Segment item : pipeline.getSegments()) { 164 memstoreSizing.incMemStoreSize(item.getMemStoreSize()); 165 } 166 return memstoreSizing.getMemStoreSize(); 167 } 168 169 /** 170 * This method is called before the flush is executed. 171 * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush 172 * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}. 173 */ 174 @Override 175 public long preFlushSeqIDEstimation() { 176 if(compositeSnapshot) { 177 return HConstants.NO_SEQNUM; 178 } 179 Segment segment = getLastSegment(); 180 if(segment == null) { 181 return HConstants.NO_SEQNUM; 182 } 183 return segment.getMinSequenceId(); 184 } 185 186 @Override 187 public boolean isSloppy() { 188 return true; 189 } 190 191 /** 192 * Push the current active memstore segment into the pipeline 193 * and create a snapshot of the tail of current compaction pipeline 194 * Snapshot must be cleared by call to {@link #clearSnapshot}. 195 * {@link #clearSnapshot(long)}. 196 * @return {@link MemStoreSnapshot} 197 */ 198 @Override 199 public MemStoreSnapshot snapshot() { 200 // If snapshot currently has entries, then flusher failed or didn't call 201 // cleanup. Log a warning. 202 if (!this.snapshot.isEmpty()) { 203 LOG.warn("Snapshot called again without clearing previous. " + 204 "Doing nothing. Another ongoing flush or did we fail last attempt?"); 205 } else { 206 LOG.debug("FLUSHING TO DISK {}, store={}", 207 getRegionServices().getRegionInfo().getEncodedName(), getFamilyName()); 208 stopCompaction(); 209 // region level lock ensures pushing active to pipeline is done in isolation 210 // no concurrent update operations trying to flush the active segment 211 pushActiveToPipeline(getActive()); 212 resetTimeOfOldestEdit(); 213 snapshotId = EnvironmentEdgeManager.currentTime(); 214 // in both cases whatever is pushed to snapshot is cleared from the pipeline 215 if (compositeSnapshot) { 216 pushPipelineToSnapshot(); 217 } else { 218 pushTailToSnapshot(); 219 } 220 compactor.resetStats(); 221 } 222 return new MemStoreSnapshot(snapshotId, this.snapshot); 223 } 224 225 @Override 226 public MemStoreSize getFlushableSize() { 227 MemStoreSize mss = getSnapshotSize(); 228 if (mss.getDataSize() == 0) { 229 // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed 230 if (compositeSnapshot) { 231 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize()); 232 MutableSegment currActive = getActive(); 233 if(!currActive.isEmpty()) { 234 memStoreSizing.incMemStoreSize(currActive.getMemStoreSize()); 235 } 236 mss = memStoreSizing.getMemStoreSize(); 237 } else { 238 mss = pipeline.getTailSize(); 239 } 240 } 241 return mss.getDataSize() > 0? mss: getActive().getMemStoreSize(); 242 } 243 244 245 public void setInMemoryCompactionCompleted() { 246 inMemoryCompactionInProgress.set(false); 247 } 248 249 protected boolean setInMemoryCompactionFlag() { 250 return inMemoryCompactionInProgress.compareAndSet(false, true); 251 } 252 253 @Override 254 protected long keySize() { 255 // Need to consider dataSize/keySize of all segments in pipeline and active 256 long keySize = getActive().getDataSize(); 257 for (Segment segment : this.pipeline.getSegments()) { 258 keySize += segment.getDataSize(); 259 } 260 return keySize; 261 } 262 263 @Override 264 protected long heapSize() { 265 // Need to consider heapOverhead of all segments in pipeline and active 266 long h = getActive().getHeapSize(); 267 for (Segment segment : this.pipeline.getSegments()) { 268 h += segment.getHeapSize(); 269 } 270 return h; 271 } 272 273 @Override 274 public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) { 275 long minSequenceId = pipeline.getMinSequenceId(); 276 if(minSequenceId != Long.MAX_VALUE) { 277 byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes(); 278 byte[] familyName = getFamilyNameInBytes(); 279 WAL WAL = getRegionServices().getWAL(); 280 if (WAL != null) { 281 WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater); 282 } 283 } 284 } 285 286 /** 287 * This message intends to inform the MemStore that next coming updates 288 * are going to be part of the replaying edits from WAL 289 */ 290 @Override 291 public void startReplayingFromWAL() { 292 inWalReplay = true; 293 } 294 295 /** 296 * This message intends to inform the MemStore that the replaying edits from WAL 297 * are done 298 */ 299 @Override 300 public void stopReplayingFromWAL() { 301 inWalReplay = false; 302 } 303 304 /** 305 * Issue any synchronization and test needed before applying the update 306 * For compacting memstore this means checking the update can increase the size without 307 * overflow 308 * @param currentActive the segment to be updated 309 * @param cell the cell to be added 310 * @param memstoreSizing object to accumulate region size changes 311 * @return true iff can proceed with applying the update 312 */ 313 @Override 314 protected boolean preUpdate(MutableSegment currentActive, Cell cell, 315 MemStoreSizing memstoreSizing) { 316 if (currentActive.sharedLock()) { 317 if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) { 318 return true; 319 } 320 currentActive.sharedUnlock(); 321 } 322 return false; 323 } 324 325 @Override protected void postUpdate(MutableSegment currentActive) { 326 currentActive.sharedUnlock(); 327 } 328 329 @Override protected boolean sizeAddedPreOperation() { 330 return true; 331 } 332 333 // the getSegments() method is used for tests only 334 @Override 335 protected List<Segment> getSegments() { 336 List<? extends Segment> pipelineList = pipeline.getSegments(); 337 List<Segment> list = new ArrayList<>(pipelineList.size() + 2); 338 list.add(getActive()); 339 list.addAll(pipelineList); 340 list.addAll(snapshot.getAllSegments()); 341 342 return list; 343 } 344 345 // the following three methods allow to manipulate the settings of composite snapshot 346 public void setCompositeSnapshot(boolean useCompositeSnapshot) { 347 this.compositeSnapshot = useCompositeSnapshot; 348 } 349 350 public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, 351 boolean merge) { 352 // last true stands for updating the region size 353 return pipeline.swap(versionedList, result, !merge, true); 354 } 355 356 /** 357 * @param requesterVersion The caller must hold the VersionedList of the pipeline 358 * with version taken earlier. This version must be passed as a parameter here. 359 * The flattening happens only if versions match. 360 */ 361 public void flattenOneSegment(long requesterVersion, MemStoreCompactionStrategy.Action action) { 362 pipeline.flattenOneSegment(requesterVersion, indexType, action); 363 } 364 365 // setter is used only for testability 366 void setIndexType(IndexType type) { 367 indexType = type; 368 // Because this functionality is for testing only and tests are setting in-memory flush size 369 // according to their need, there is no setting of in-memory flush size, here. 370 // If it is needed, please change in-memory flush size explicitly 371 } 372 373 public IndexType getIndexType() { 374 return indexType; 375 } 376 377 public boolean hasImmutableSegments() { 378 return !pipeline.isEmpty(); 379 } 380 381 public VersionedSegmentsList getImmutableSegments() { 382 return pipeline.getVersionedList(); 383 } 384 385 public long getSmallestReadPoint() { 386 return store.getSmallestReadPoint(); 387 } 388 389 public HStore getStore() { 390 return store; 391 } 392 393 public String getFamilyName() { 394 return Bytes.toString(getFamilyNameInBytes()); 395 } 396 397 @Override 398 public List<KeyValueScanner> getScanners(long readPt) throws IOException { 399 MutableSegment activeTmp = getActive(); 400 List<? extends Segment> pipelineList = pipeline.getSegments(); 401 List<? extends Segment> snapshotList = snapshot.getAllSegments(); 402 long numberOfSegments = 1L + pipelineList.size() + snapshotList.size(); 403 // The list of elements in pipeline + the active element + the snapshot segment 404 List<KeyValueScanner> list = createList((int) numberOfSegments); 405 addToScanners(activeTmp, readPt, list); 406 addToScanners(pipelineList, readPt, list); 407 addToScanners(snapshotList, readPt, list); 408 return list; 409 } 410 411 protected List<KeyValueScanner> createList(int capacity) { 412 return new ArrayList<>(capacity); 413 } 414 415 /** 416 * Check whether anything need to be done based on the current active set size. 417 * The method is invoked upon every addition to the active set. 418 * For CompactingMemStore, flush the active set to the read-only memory if it's 419 * size is above threshold 420 * @param currActive intended segment to update 421 * @param cellToAdd cell to be added to the segment 422 * @param memstoreSizing object to accumulate changed size 423 * @return true if the cell can be added to the 424 */ 425 private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, 426 MemStoreSizing memstoreSizing) { 427 if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) { 428 if (currActive.setInMemoryFlushed()) { 429 flushInMemory(currActive); 430 if (setInMemoryCompactionFlag()) { 431 // The thread is dispatched to do in-memory compaction in the background 432 InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable(); 433 if (LOG.isTraceEnabled()) { 434 LOG.trace("Dispatching the MemStore in-memory flush for store " + store 435 .getColumnFamilyName()); 436 } 437 getPool().execute(runnable); 438 } 439 } 440 return false; 441 } 442 return true; 443 } 444 445 // externally visible only for tests 446 // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock, 447 // otherwise there is a deadlock 448 void flushInMemory() { 449 MutableSegment currActive = getActive(); 450 if(currActive.setInMemoryFlushed()) { 451 flushInMemory(currActive); 452 } 453 inMemoryCompaction(); 454 } 455 456 private void flushInMemory(MutableSegment currActive) { 457 LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline"); 458 pushActiveToPipeline(currActive); 459 } 460 461 void inMemoryCompaction() { 462 // setting the inMemoryCompactionInProgress flag again for the case this method is invoked 463 // directly (only in tests) in the common path setting from true to true is idempotent 464 inMemoryCompactionInProgress.set(true); 465 // Used by tests 466 if (!allowCompaction.get()) { 467 return; 468 } 469 try { 470 // Speculative compaction execution, may be interrupted if flush is forced while 471 // compaction is in progress 472 if(!compactor.start()) { 473 setInMemoryCompactionCompleted(); 474 } 475 } catch (IOException e) { 476 LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}", 477 getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e); 478 } 479 } 480 481 private Segment getLastSegment() { 482 Segment localActive = getActive(); 483 Segment tail = pipeline.getTail(); 484 return tail == null ? localActive : tail; 485 } 486 487 private byte[] getFamilyNameInBytes() { 488 return store.getColumnFamilyDescriptor().getName(); 489 } 490 491 private ThreadPoolExecutor getPool() { 492 return getRegionServices().getInMemoryCompactionPool(); 493 } 494 495 protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd, 496 MemStoreSizing memstoreSizing) { 497 long cellSize = MutableSegment.getCellLength(cellToAdd); 498 long segmentDataSize = currActive.getDataSize(); 499 while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) { 500 // when replaying edits from WAL there is no need in in-memory flush regardless the size 501 // otherwise size below flush threshold try to update atomically 502 if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) { 503 if (memstoreSizing != null) { 504 memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0); 505 } 506 // enough space for cell - no need to flush 507 return false; 508 } 509 segmentDataSize = currActive.getDataSize(); 510 } 511 // size above flush threshold 512 return true; 513 } 514 515 /** 516 * The request to cancel the compaction asynchronous task (caused by in-memory flush) 517 * The compaction may still happen if the request was sent too late 518 * Non-blocking request 519 */ 520 private void stopCompaction() { 521 if (inMemoryCompactionInProgress.get()) { 522 compactor.stop(); 523 } 524 } 525 526 protected void pushActiveToPipeline(MutableSegment currActive) { 527 if (!currActive.isEmpty()) { 528 pipeline.pushHead(currActive); 529 resetActive(); 530 } 531 } 532 533 private void pushTailToSnapshot() { 534 VersionedSegmentsList segments = pipeline.getVersionedTail(); 535 pushToSnapshot(segments.getStoreSegments()); 536 // In Swap: don't close segments (they are in snapshot now) and don't update the region size 537 pipeline.swap(segments,null,false, false); 538 } 539 540 private void pushPipelineToSnapshot() { 541 int iterationsCnt = 0; 542 boolean done = false; 543 while (!done) { 544 iterationsCnt++; 545 VersionedSegmentsList segments = pipeline.getVersionedList(); 546 pushToSnapshot(segments.getStoreSegments()); 547 // swap can return false in case the pipeline was updated by ongoing compaction 548 // and the version increase, the chance of it happenning is very low 549 // In Swap: don't close segments (they are in snapshot now) and don't update the region size 550 done = pipeline.swap(segments, null, false, false); 551 if (iterationsCnt>2) { 552 // practically it is impossible that this loop iterates more than two times 553 // (because the compaction is stopped and none restarts it while in snapshot request), 554 // however stopping here for the case of the infinite loop causing by any error 555 LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," + 556 " while flushing to disk."); 557 this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator()); 558 break; 559 } 560 } 561 } 562 563 private void pushToSnapshot(List<ImmutableSegment> segments) { 564 if(segments.isEmpty()) return; 565 if(segments.size() == 1 && !segments.get(0).isEmpty()) { 566 this.snapshot = segments.get(0); 567 return; 568 } else { // create composite snapshot 569 this.snapshot = 570 SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments); 571 } 572 } 573 574 private RegionServicesForStores getRegionServices() { 575 return regionServices; 576 } 577 578 /** 579 * The in-memory-flusher thread performs the flush asynchronously. 580 * There is at most one thread per memstore instance. 581 * It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock 582 * and compacts the pipeline. 583 */ 584 private class InMemoryCompactionRunnable implements Runnable { 585 @Override 586 public void run() { 587 inMemoryCompaction(); 588 } 589 } 590 591 boolean isMemStoreFlushingInMemory() { 592 return inMemoryCompactionInProgress.get(); 593 } 594 595 /** 596 * @param cell Find the row that comes after this one. If null, we return the 597 * first. 598 * @return Next row or null if none found. 599 */ 600 Cell getNextRow(final Cell cell) { 601 Cell lowest = null; 602 List<Segment> segments = getSegments(); 603 for (Segment segment : segments) { 604 if (lowest == null) { 605 lowest = getNextRow(cell, segment.getCellSet()); 606 } else { 607 lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet())); 608 } 609 } 610 return lowest; 611 } 612 613 long getInmemoryFlushSize() { 614 return inmemoryFlushSize; 615 } 616 617 // debug method 618 public void debug() { 619 String msg = "active size=" + getActive().getDataSize(); 620 msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false"); 621 msg += " inMemoryCompactionInProgress is "+ (inMemoryCompactionInProgress.get() ? "true" : 622 "false"); 623 LOG.debug(msg); 624 } 625 626}