001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 022import org.apache.hadoop.util.StringUtils; 023import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.atomic.AtomicBoolean; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellComparator; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.MemoryCompactionPolicy; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.ClassSize; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.wal.WAL; 042 043/** 044 * A memstore implementation which supports in-memory compaction. 045 * A compaction pipeline is added between the active set and the snapshot data structures; 046 * it consists of a list of segments that are subject to compaction. 047 * Like the snapshot, all pipeline segments are read-only; updates only affect the active set. 048 * To ensure this property we take advantage of the existing blocking mechanism -- the active set 049 * is pushed to the pipeline while holding the region's updatesLock in exclusive mode. 050 * Periodically, a compaction is applied in the background to all pipeline segments resulting 051 * in a single read-only component. The ``old'' segments are discarded when no scanner is reading 052 * them. 053 */ 054@InterfaceAudience.Private 055public class CompactingMemStore extends AbstractMemStore { 056 057 // The external setting of the compacting MemStore behaviour 058 public static final String COMPACTING_MEMSTORE_TYPE_KEY = 059 "hbase.hregion.compacting.memstore.type"; 060 public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = 061 String.valueOf(MemoryCompactionPolicy.NONE); 062 // Default fraction of in-memory-flush size w.r.t. flush-to-disk size 063 public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = 064 "hbase.memstore.inmemoryflush.threshold.factor"; 065 private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.014; 066 // In-Memory compaction pool size 067 public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY = 068 "hbase.regionserver.inmemory.compaction.pool.size"; 069 public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10; 070 071 private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class); 072 private HStore store; 073 private CompactionPipeline pipeline; 074 protected MemStoreCompactor compactor; 075 076 private long inmemoryFlushSize; // the threshold on active size for in-memory flush 077 private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); 078 079 // inWalReplay is true while we are synchronously replaying the edits from WAL 080 private boolean inWalReplay = false; 081 082 @VisibleForTesting 083 protected final AtomicBoolean allowCompaction = new AtomicBoolean(true); 084 private boolean compositeSnapshot = true; 085 086 /** 087 * Types of indexes (part of immutable segments) to be used after flattening, 088 * compaction, or merge are applied. 089 */ 090 public enum IndexType { 091 CSLM_MAP, // ConcurrentSkipLisMap 092 ARRAY_MAP, // CellArrayMap 093 CHUNK_MAP // CellChunkMap 094 } 095 096 private IndexType indexType = IndexType.ARRAY_MAP; // default implementation 097 098 public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD 099 + 6 * ClassSize.REFERENCE // Store, CompactionPipeline, 100 // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction, 101 // indexType 102 + Bytes.SIZEOF_LONG // inmemoryFlushSize 103 + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay 104 + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction 105 + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD); 106 107 public CompactingMemStore(Configuration conf, CellComparator c, 108 HStore store, RegionServicesForStores regionServices, 109 MemoryCompactionPolicy compactionPolicy) throws IOException { 110 super(conf, c, regionServices); 111 this.store = store; 112 this.regionServices = regionServices; 113 this.pipeline = new CompactionPipeline(getRegionServices()); 114 this.compactor = createMemStoreCompactor(compactionPolicy); 115 if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) { 116 // if user requested to work with MSLABs (whether on- or off-heap), then the 117 // immutable segments are going to use CellChunkMap as their index 118 indexType = IndexType.CHUNK_MAP; 119 } else { 120 indexType = IndexType.ARRAY_MAP; 121 } 122 // initialization of the flush size should happen after initialization of the index type 123 // so do not transfer the following method 124 initInmemoryFlushSize(conf); 125 LOG.info("Store={}, in-memory flush size threshold={}, immutable segments index type={}, " + 126 "compactor={}", this.store.getColumnFamilyName(), 127 StringUtils.byteDesc(this.inmemoryFlushSize), this.indexType, 128 (this.compactor == null? "NULL": this.compactor.toString())); 129 } 130 131 @VisibleForTesting 132 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 133 throws IllegalArgumentIOException { 134 return new MemStoreCompactor(this, compactionPolicy); 135 } 136 137 private void initInmemoryFlushSize(Configuration conf) { 138 double factor = 0; 139 long memstoreFlushSize = getRegionServices().getMemStoreFlushSize(); 140 int numStores = getRegionServices().getNumStores(); 141 if (numStores <= 1) { 142 // Family number might also be zero in some of our unit test case 143 numStores = 1; 144 } 145 inmemoryFlushSize = memstoreFlushSize / numStores; 146 // multiply by a factor (the same factor for all index types) 147 factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 148 IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT); 149 150 inmemoryFlushSize = (long) (inmemoryFlushSize * factor); 151 } 152 153 /** 154 * @return Total memory occupied by this MemStore. This won't include any size occupied by the 155 * snapshot. We assume the snapshot will get cleared soon. This is not thread safe and 156 * the memstore may be changed while computing its size. It is the responsibility of the 157 * caller to make sure this doesn't happen. 158 */ 159 @Override 160 public MemStoreSize size() { 161 MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing(); 162 memstoreSizing.incMemStoreSize(active.getMemStoreSize()); 163 for (Segment item : pipeline.getSegments()) { 164 memstoreSizing.incMemStoreSize(item.getMemStoreSize()); 165 } 166 return memstoreSizing.getMemStoreSize(); 167 } 168 169 /** 170 * This method is called before the flush is executed. 171 * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush 172 * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}. 173 */ 174 @Override 175 public long preFlushSeqIDEstimation() { 176 if(compositeSnapshot) { 177 return HConstants.NO_SEQNUM; 178 } 179 Segment segment = getLastSegment(); 180 if(segment == null) { 181 return HConstants.NO_SEQNUM; 182 } 183 return segment.getMinSequenceId(); 184 } 185 186 @Override 187 public boolean isSloppy() { 188 return true; 189 } 190 191 /** 192 * Push the current active memstore segment into the pipeline 193 * and create a snapshot of the tail of current compaction pipeline 194 * Snapshot must be cleared by call to {@link #clearSnapshot}. 195 * {@link #clearSnapshot(long)}. 196 * @return {@link MemStoreSnapshot} 197 */ 198 @Override 199 public MemStoreSnapshot snapshot() { 200 // If snapshot currently has entries, then flusher failed or didn't call 201 // cleanup. Log a warning. 202 if (!this.snapshot.isEmpty()) { 203 LOG.warn("Snapshot called again without clearing previous. " + 204 "Doing nothing. Another ongoing flush or did we fail last attempt?"); 205 } else { 206 LOG.debug("FLUSHING TO DISK {}, store={}", 207 getRegionServices().getRegionInfo().getEncodedName(), getFamilyName()); 208 stopCompaction(); 209 pushActiveToPipeline(this.active); 210 snapshotId = EnvironmentEdgeManager.currentTime(); 211 // in both cases whatever is pushed to snapshot is cleared from the pipeline 212 if (compositeSnapshot) { 213 pushPipelineToSnapshot(); 214 } else { 215 pushTailToSnapshot(); 216 } 217 compactor.resetStats(); 218 } 219 return new MemStoreSnapshot(snapshotId, this.snapshot); 220 } 221 222 @Override 223 public MemStoreSize getFlushableSize() { 224 MemStoreSize mss = getSnapshotSize(); 225 if (mss.getDataSize() == 0) { 226 // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed 227 if (compositeSnapshot) { 228 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize()); 229 memStoreSizing.incMemStoreSize(this.active.getMemStoreSize()); 230 mss = memStoreSizing.getMemStoreSize(); 231 } else { 232 mss = pipeline.getTailSize(); 233 } 234 } 235 return mss.getDataSize() > 0? mss: this.active.getMemStoreSize(); 236 } 237 238 @Override 239 protected long keySize() { 240 // Need to consider dataSize/keySize of all segments in pipeline and active 241 long keySize = this.active.getDataSize(); 242 for (Segment segment : this.pipeline.getSegments()) { 243 keySize += segment.getDataSize(); 244 } 245 return keySize; 246 } 247 248 @Override 249 protected long heapSize() { 250 // Need to consider heapOverhead of all segments in pipeline and active 251 long h = this.active.getHeapSize(); 252 for (Segment segment : this.pipeline.getSegments()) { 253 h += segment.getHeapSize(); 254 } 255 return h; 256 } 257 258 @Override 259 public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) { 260 long minSequenceId = pipeline.getMinSequenceId(); 261 if(minSequenceId != Long.MAX_VALUE) { 262 byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes(); 263 byte[] familyName = getFamilyNameInBytes(); 264 WAL WAL = getRegionServices().getWAL(); 265 if (WAL != null) { 266 WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater); 267 } 268 } 269 } 270 271 /** 272 * This message intends to inform the MemStore that next coming updates 273 * are going to be part of the replaying edits from WAL 274 */ 275 @Override 276 public void startReplayingFromWAL() { 277 inWalReplay = true; 278 } 279 280 /** 281 * This message intends to inform the MemStore that the replaying edits from WAL 282 * are done 283 */ 284 @Override 285 public void stopReplayingFromWAL() { 286 inWalReplay = false; 287 } 288 289 // the getSegments() method is used for tests only 290 @VisibleForTesting 291 @Override 292 protected List<Segment> getSegments() { 293 List<? extends Segment> pipelineList = pipeline.getSegments(); 294 List<Segment> list = new ArrayList<>(pipelineList.size() + 2); 295 list.add(this.active); 296 list.addAll(pipelineList); 297 list.addAll(this.snapshot.getAllSegments()); 298 299 return list; 300 } 301 302 // the following three methods allow to manipulate the settings of composite snapshot 303 public void setCompositeSnapshot(boolean useCompositeSnapshot) { 304 this.compositeSnapshot = useCompositeSnapshot; 305 } 306 307 public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, 308 boolean merge) { 309 // last true stands for updating the region size 310 return pipeline.swap(versionedList, result, !merge, true); 311 } 312 313 /** 314 * @param requesterVersion The caller must hold the VersionedList of the pipeline 315 * with version taken earlier. This version must be passed as a parameter here. 316 * The flattening happens only if versions match. 317 */ 318 public void flattenOneSegment(long requesterVersion, MemStoreCompactionStrategy.Action action) { 319 pipeline.flattenOneSegment(requesterVersion, indexType, action); 320 } 321 322 // setter is used only for testability 323 @VisibleForTesting 324 void setIndexType(IndexType type) { 325 indexType = type; 326 // Because this functionality is for testing only and tests are setting in-memory flush size 327 // according to their need, there is no setting of in-memory flush size, here. 328 // If it is needed, please change in-memory flush size explicitly 329 } 330 331 public IndexType getIndexType() { 332 return indexType; 333 } 334 335 public boolean hasImmutableSegments() { 336 return !pipeline.isEmpty(); 337 } 338 339 public VersionedSegmentsList getImmutableSegments() { 340 return pipeline.getVersionedList(); 341 } 342 343 public long getSmallestReadPoint() { 344 return store.getSmallestReadPoint(); 345 } 346 347 public HStore getStore() { 348 return store; 349 } 350 351 public String getFamilyName() { 352 return Bytes.toString(getFamilyNameInBytes()); 353 } 354 355 @Override 356 /* 357 * Scanners are ordered from 0 (oldest) to newest in increasing order. 358 */ 359 public List<KeyValueScanner> getScanners(long readPt) throws IOException { 360 MutableSegment activeTmp = active; 361 List<? extends Segment> pipelineList = pipeline.getSegments(); 362 List<? extends Segment> snapshotList = snapshot.getAllSegments(); 363 long order = 1L + pipelineList.size() + snapshotList.size(); 364 // The list of elements in pipeline + the active element + the snapshot segment 365 // The order is the Segment ordinal 366 List<KeyValueScanner> list = createList((int) order); 367 order = addToScanners(activeTmp, readPt, order, list); 368 order = addToScanners(pipelineList, readPt, order, list); 369 addToScanners(snapshotList, readPt, order, list); 370 return list; 371 } 372 373 @VisibleForTesting 374 protected List<KeyValueScanner> createList(int capacity) { 375 return new ArrayList<>(capacity); 376 } 377 378 /** 379 * Check whether anything need to be done based on the current active set size. 380 * The method is invoked upon every addition to the active set. 381 * For CompactingMemStore, flush the active set to the read-only memory if it's 382 * size is above threshold 383 */ 384 @Override 385 protected void checkActiveSize() { 386 if (shouldFlushInMemory()) { 387 /* The thread is dispatched to flush-in-memory. This cannot be done 388 * on the same thread, because for flush-in-memory we require updatesLock 389 * in exclusive mode while this method (checkActiveSize) is invoked holding updatesLock 390 * in the shared mode. */ 391 InMemoryFlushRunnable runnable = new InMemoryFlushRunnable(); 392 if (LOG.isTraceEnabled()) { 393 LOG.trace( 394 "Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName()); 395 } 396 getPool().execute(runnable); 397 } 398 } 399 400 // internally used method, externally visible only for tests 401 // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock, 402 // otherwise there is a deadlock 403 @VisibleForTesting 404 void flushInMemory() throws IOException { 405 // setting the inMemoryFlushInProgress flag again for the case this method is invoked 406 // directly (only in tests) in the common path setting from true to true is idempotent 407 inMemoryFlushInProgress.set(true); 408 try { 409 // Phase I: Update the pipeline 410 getRegionServices().blockUpdates(); 411 try { 412 LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline"); 413 pushActiveToPipeline(this.active); 414 } finally { 415 getRegionServices().unblockUpdates(); 416 } 417 418 // Used by tests 419 if (!allowCompaction.get()) { 420 return; 421 } 422 // Phase II: Compact the pipeline 423 try { 424 // Speculative compaction execution, may be interrupted if flush is forced while 425 // compaction is in progress 426 compactor.start(); 427 } catch (IOException e) { 428 LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}", 429 getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e); 430 } 431 } finally { 432 inMemoryFlushInProgress.set(false); 433 LOG.trace("IN-MEMORY FLUSH: end"); 434 } 435 } 436 437 private Segment getLastSegment() { 438 Segment localActive = getActive(); 439 Segment tail = pipeline.getTail(); 440 return tail == null ? localActive : tail; 441 } 442 443 private byte[] getFamilyNameInBytes() { 444 return store.getColumnFamilyDescriptor().getName(); 445 } 446 447 private ThreadPoolExecutor getPool() { 448 return getRegionServices().getInMemoryCompactionPool(); 449 } 450 451 @VisibleForTesting 452 protected boolean shouldFlushInMemory() { 453 if (this.active.getDataSize() > inmemoryFlushSize) { // size above flush threshold 454 if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush 455 return false; // regardless the size 456 } 457 // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude 458 // the insert of the active into the compaction pipeline 459 return (inMemoryFlushInProgress.compareAndSet(false,true)); 460 } 461 return false; 462 } 463 464 /** 465 * The request to cancel the compaction asynchronous task (caused by in-memory flush) 466 * The compaction may still happen if the request was sent too late 467 * Non-blocking request 468 */ 469 private void stopCompaction() { 470 if (inMemoryFlushInProgress.get()) { 471 compactor.stop(); 472 } 473 } 474 475 protected void pushActiveToPipeline(MutableSegment active) { 476 if (!active.isEmpty()) { 477 pipeline.pushHead(active); 478 resetActive(); 479 } 480 } 481 482 private void pushTailToSnapshot() { 483 VersionedSegmentsList segments = pipeline.getVersionedTail(); 484 pushToSnapshot(segments.getStoreSegments()); 485 // In Swap: don't close segments (they are in snapshot now) and don't update the region size 486 pipeline.swap(segments,null,false, false); 487 } 488 489 private void pushPipelineToSnapshot() { 490 int iterationsCnt = 0; 491 boolean done = false; 492 while (!done) { 493 iterationsCnt++; 494 VersionedSegmentsList segments = pipeline.getVersionedList(); 495 pushToSnapshot(segments.getStoreSegments()); 496 // swap can return false in case the pipeline was updated by ongoing compaction 497 // and the version increase, the chance of it happenning is very low 498 // In Swap: don't close segments (they are in snapshot now) and don't update the region size 499 done = pipeline.swap(segments, null, false, false); 500 if (iterationsCnt>2) { 501 // practically it is impossible that this loop iterates more than two times 502 // (because the compaction is stopped and none restarts it while in snapshot request), 503 // however stopping here for the case of the infinite loop causing by any error 504 LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," + 505 " while flushing to disk."); 506 this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator()); 507 break; 508 } 509 } 510 } 511 512 private void pushToSnapshot(List<ImmutableSegment> segments) { 513 if(segments.isEmpty()) return; 514 if(segments.size() == 1 && !segments.get(0).isEmpty()) { 515 this.snapshot = segments.get(0); 516 return; 517 } else { // create composite snapshot 518 this.snapshot = 519 SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments); 520 } 521 } 522 523 private RegionServicesForStores getRegionServices() { 524 return regionServices; 525 } 526 527 /** 528 * The in-memory-flusher thread performs the flush asynchronously. 529 * There is at most one thread per memstore instance. 530 * It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock 531 * and compacts the pipeline. 532 */ 533 private class InMemoryFlushRunnable implements Runnable { 534 535 @Override 536 public void run() { 537 try { 538 flushInMemory(); 539 } catch (IOException e) { 540 LOG.warn("Unable to run memstore compaction. region " 541 + getRegionServices().getRegionInfo().getRegionNameAsString() 542 + "store: "+ getFamilyName(), e); 543 } 544 } 545 } 546 547 @VisibleForTesting 548 boolean isMemStoreFlushingInMemory() { 549 return inMemoryFlushInProgress.get(); 550 } 551 552 /** 553 * @param cell Find the row that comes after this one. If null, we return the 554 * first. 555 * @return Next row or null if none found. 556 */ 557 Cell getNextRow(final Cell cell) { 558 Cell lowest = null; 559 List<Segment> segments = getSegments(); 560 for (Segment segment : segments) { 561 if (lowest == null) { 562 lowest = getNextRow(cell, segment.getCellSet()); 563 } else { 564 lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet())); 565 } 566 } 567 return lowest; 568 } 569 570 @VisibleForTesting 571 long getInmemoryFlushSize() { 572 return inmemoryFlushSize; 573 } 574 575 // debug method 576 public void debug() { 577 String msg = "active size=" + this.active.getDataSize(); 578 msg += " in-memory flush size is "+ inmemoryFlushSize; 579 msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false"); 580 msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false"); 581 LOG.debug(msg); 582 } 583 584}