001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 022import org.apache.hadoop.util.StringUtils; 023import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.atomic.AtomicBoolean; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellComparator; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.MemoryCompactionPolicy; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.ClassSize; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.wal.WAL; 042 043/** 044 * A memstore implementation which supports in-memory compaction. 045 * A compaction pipeline is added between the active set and the snapshot data structures; 046 * it consists of a list of segments that are subject to compaction. 047 * Like the snapshot, all pipeline segments are read-only; updates only affect the active set. 048 * To ensure this property we take advantage of the existing blocking mechanism -- the active set 049 * is pushed to the pipeline while holding the region's updatesLock in exclusive mode. 050 * Periodically, a compaction is applied in the background to all pipeline segments resulting 051 * in a single read-only component. The ``old'' segments are discarded when no scanner is reading 052 * them. 053 */ 054@InterfaceAudience.Private 055public class CompactingMemStore extends AbstractMemStore { 056 057 // The external setting of the compacting MemStore behaviour 058 public static final String COMPACTING_MEMSTORE_TYPE_KEY = 059 "hbase.hregion.compacting.memstore.type"; 060 public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = 061 String.valueOf(MemoryCompactionPolicy.NONE); 062 // Default fraction of in-memory-flush size w.r.t. flush-to-disk size 063 public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = 064 "hbase.memstore.inmemoryflush.threshold.factor"; 065 private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.014; 066 // In-Memory compaction pool size 067 public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY = 068 "hbase.regionserver.inmemory.compaction.pool.size"; 069 public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10; 070 071 private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class); 072 private HStore store; 073 private CompactionPipeline pipeline; 074 protected MemStoreCompactor compactor; 075 076 private long inmemoryFlushSize; // the threshold on active size for in-memory flush 077 private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false); 078 079 // inWalReplay is true while we are synchronously replaying the edits from WAL 080 private boolean inWalReplay = false; 081 082 @VisibleForTesting 083 protected final AtomicBoolean allowCompaction = new AtomicBoolean(true); 084 private boolean compositeSnapshot = true; 085 086 /** 087 * Types of indexes (part of immutable segments) to be used after flattening, 088 * compaction, or merge are applied. 089 */ 090 public enum IndexType { 091 CSLM_MAP, // ConcurrentSkipLisMap 092 ARRAY_MAP, // CellArrayMap 093 CHUNK_MAP // CellChunkMap 094 } 095 096 private IndexType indexType = IndexType.ARRAY_MAP; // default implementation 097 098 public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD 099 + 6 * ClassSize.REFERENCE // Store, CompactionPipeline, 100 // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction, 101 // indexType 102 + Bytes.SIZEOF_LONG // inmemoryFlushSize 103 + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay 104 + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction 105 + CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD); 106 107 public CompactingMemStore(Configuration conf, CellComparator c, 108 HStore store, RegionServicesForStores regionServices, 109 MemoryCompactionPolicy compactionPolicy) throws IOException { 110 super(conf, c, regionServices); 111 this.store = store; 112 this.regionServices = regionServices; 113 this.pipeline = new CompactionPipeline(getRegionServices()); 114 this.compactor = createMemStoreCompactor(compactionPolicy); 115 if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) { 116 // if user requested to work with MSLABs (whether on- or off-heap), then the 117 // immutable segments are going to use CellChunkMap as their index 118 indexType = IndexType.CHUNK_MAP; 119 } else { 120 indexType = IndexType.ARRAY_MAP; 121 } 122 // initialization of the flush size should happen after initialization of the index type 123 // so do not transfer the following method 124 initInmemoryFlushSize(conf); 125 LOG.info("Store={}, in-memory flush size threshold={}, immutable segments index type={}, " + 126 "compactor={}", this.store.getColumnFamilyName(), 127 StringUtils.byteDesc(this.inmemoryFlushSize), this.indexType, 128 (this.compactor == null? "NULL": this.compactor.toString())); 129 } 130 131 @VisibleForTesting 132 protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) 133 throws IllegalArgumentIOException { 134 return new MemStoreCompactor(this, compactionPolicy); 135 } 136 137 private void initInmemoryFlushSize(Configuration conf) { 138 double factor = 0; 139 long memstoreFlushSize = getRegionServices().getMemStoreFlushSize(); 140 int numStores = getRegionServices().getNumStores(); 141 if (numStores <= 1) { 142 // Family number might also be zero in some of our unit test case 143 numStores = 1; 144 } 145 inmemoryFlushSize = memstoreFlushSize / numStores; 146 // multiply by a factor (the same factor for all index types) 147 factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 148 IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT); 149 150 inmemoryFlushSize = (long) (inmemoryFlushSize * factor); 151 } 152 153 /** 154 * @return Total memory occupied by this MemStore. This won't include any size occupied by the 155 * snapshot. We assume the snapshot will get cleared soon. This is not thread safe and 156 * the memstore may be changed while computing its size. It is the responsibility of the 157 * caller to make sure this doesn't happen. 158 */ 159 @Override 160 public MemStoreSize size() { 161 MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing(); 162 memstoreSizing.incMemStoreSize(active.getMemStoreSize()); 163 for (Segment item : pipeline.getSegments()) { 164 memstoreSizing.incMemStoreSize(item.getMemStoreSize()); 165 } 166 return memstoreSizing.getMemStoreSize(); 167 } 168 169 /** 170 * This method is called before the flush is executed. 171 * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush 172 * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}. 173 */ 174 @Override 175 public long preFlushSeqIDEstimation() { 176 if(compositeSnapshot) { 177 return HConstants.NO_SEQNUM; 178 } 179 Segment segment = getLastSegment(); 180 if(segment == null) { 181 return HConstants.NO_SEQNUM; 182 } 183 return segment.getMinSequenceId(); 184 } 185 186 @Override 187 public boolean isSloppy() { 188 return true; 189 } 190 191 /** 192 * Push the current active memstore segment into the pipeline 193 * and create a snapshot of the tail of current compaction pipeline 194 * Snapshot must be cleared by call to {@link #clearSnapshot}. 195 * {@link #clearSnapshot(long)}. 196 * @return {@link MemStoreSnapshot} 197 */ 198 @Override 199 public MemStoreSnapshot snapshot() { 200 // If snapshot currently has entries, then flusher failed or didn't call 201 // cleanup. Log a warning. 202 if (!this.snapshot.isEmpty()) { 203 LOG.warn("Snapshot called again without clearing previous. " + 204 "Doing nothing. Another ongoing flush or did we fail last attempt?"); 205 } else { 206 LOG.debug("FLUSHING TO DISK {}, store={}", 207 getRegionServices().getRegionInfo().getEncodedName(), getFamilyName()); 208 stopCompaction(); 209 pushActiveToPipeline(this.active); 210 snapshotId = EnvironmentEdgeManager.currentTime(); 211 // in both cases whatever is pushed to snapshot is cleared from the pipeline 212 if (compositeSnapshot) { 213 pushPipelineToSnapshot(); 214 } else { 215 pushTailToSnapshot(); 216 } 217 compactor.resetStats(); 218 } 219 return new MemStoreSnapshot(snapshotId, this.snapshot); 220 } 221 222 @Override 223 public MemStoreSize getFlushableSize() { 224 MemStoreSize mss = getSnapshotSize(); 225 if (mss.getDataSize() == 0) { 226 // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed 227 if (compositeSnapshot) { 228 MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize()); 229 memStoreSizing.incMemStoreSize(this.active.getMemStoreSize()); 230 mss = memStoreSizing.getMemStoreSize(); 231 } else { 232 mss = pipeline.getTailSize(); 233 } 234 } 235 return mss.getDataSize() > 0? mss: this.active.getMemStoreSize(); 236 } 237 238 @Override 239 protected long keySize() { 240 // Need to consider dataSize/keySize of all segments in pipeline and active 241 long keySize = this.active.getDataSize(); 242 for (Segment segment : this.pipeline.getSegments()) { 243 keySize += segment.getDataSize(); 244 } 245 return keySize; 246 } 247 248 @Override 249 protected long heapSize() { 250 // Need to consider heapOverhead of all segments in pipeline and active 251 long h = this.active.getHeapSize(); 252 for (Segment segment : this.pipeline.getSegments()) { 253 h += segment.getHeapSize(); 254 } 255 return h; 256 } 257 258 @Override 259 public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) { 260 long minSequenceId = pipeline.getMinSequenceId(); 261 if(minSequenceId != Long.MAX_VALUE) { 262 byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes(); 263 byte[] familyName = getFamilyNameInBytes(); 264 WAL WAL = getRegionServices().getWAL(); 265 if (WAL != null) { 266 WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater); 267 } 268 } 269 } 270 271 /** 272 * This message intends to inform the MemStore that next coming updates 273 * are going to be part of the replaying edits from WAL 274 */ 275 @Override 276 public void startReplayingFromWAL() { 277 inWalReplay = true; 278 } 279 280 /** 281 * This message intends to inform the MemStore that the replaying edits from WAL 282 * are done 283 */ 284 @Override 285 public void stopReplayingFromWAL() { 286 inWalReplay = false; 287 } 288 289 // the getSegments() method is used for tests only 290 @VisibleForTesting 291 @Override 292 protected List<Segment> getSegments() { 293 List<? extends Segment> pipelineList = pipeline.getSegments(); 294 List<Segment> list = new ArrayList<>(pipelineList.size() + 2); 295 list.add(this.active); 296 list.addAll(pipelineList); 297 list.addAll(this.snapshot.getAllSegments()); 298 299 return list; 300 } 301 302 // the following three methods allow to manipulate the settings of composite snapshot 303 public void setCompositeSnapshot(boolean useCompositeSnapshot) { 304 this.compositeSnapshot = useCompositeSnapshot; 305 } 306 307 public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result, 308 boolean merge) { 309 // last true stands for updating the region size 310 return pipeline.swap(versionedList, result, !merge, true); 311 } 312 313 /** 314 * @param requesterVersion The caller must hold the VersionedList of the pipeline 315 * with version taken earlier. This version must be passed as a parameter here. 316 * The flattening happens only if versions match. 317 */ 318 public void flattenOneSegment(long requesterVersion, MemStoreCompactionStrategy.Action action) { 319 pipeline.flattenOneSegment(requesterVersion, indexType, action); 320 } 321 322 // setter is used only for testability 323 @VisibleForTesting 324 void setIndexType(IndexType type) { 325 indexType = type; 326 // Because this functionality is for testing only and tests are setting in-memory flush size 327 // according to their need, there is no setting of in-memory flush size, here. 328 // If it is needed, please change in-memory flush size explicitly 329 } 330 331 public IndexType getIndexType() { 332 return indexType; 333 } 334 335 public boolean hasImmutableSegments() { 336 return !pipeline.isEmpty(); 337 } 338 339 public VersionedSegmentsList getImmutableSegments() { 340 return pipeline.getVersionedList(); 341 } 342 343 public long getSmallestReadPoint() { 344 return store.getSmallestReadPoint(); 345 } 346 347 public HStore getStore() { 348 return store; 349 } 350 351 public String getFamilyName() { 352 return Bytes.toString(getFamilyNameInBytes()); 353 } 354 355 @Override 356 public List<KeyValueScanner> getScanners(long readPt) throws IOException { 357 MutableSegment activeTmp = active; 358 List<? extends Segment> pipelineList = pipeline.getSegments(); 359 List<? extends Segment> snapshotList = snapshot.getAllSegments(); 360 long numberOfSegments = 1L + pipelineList.size() + snapshotList.size(); 361 // The list of elements in pipeline + the active element + the snapshot segment 362 List<KeyValueScanner> list = createList((int) numberOfSegments); 363 addToScanners(activeTmp, readPt, list); 364 addToScanners(pipelineList, readPt, list); 365 addToScanners(snapshotList, readPt, list); 366 return list; 367 } 368 369 @VisibleForTesting 370 protected List<KeyValueScanner> createList(int capacity) { 371 return new ArrayList<>(capacity); 372 } 373 374 /** 375 * Check whether anything need to be done based on the current active set size. 376 * The method is invoked upon every addition to the active set. 377 * For CompactingMemStore, flush the active set to the read-only memory if it's 378 * size is above threshold 379 */ 380 @Override 381 protected void checkActiveSize() { 382 if (shouldFlushInMemory()) { 383 /* The thread is dispatched to flush-in-memory. This cannot be done 384 * on the same thread, because for flush-in-memory we require updatesLock 385 * in exclusive mode while this method (checkActiveSize) is invoked holding updatesLock 386 * in the shared mode. */ 387 InMemoryFlushRunnable runnable = new InMemoryFlushRunnable(); 388 if (LOG.isTraceEnabled()) { 389 LOG.trace( 390 "Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName()); 391 } 392 getPool().execute(runnable); 393 } 394 } 395 396 // internally used method, externally visible only for tests 397 // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock, 398 // otherwise there is a deadlock 399 @VisibleForTesting 400 void flushInMemory() throws IOException { 401 // setting the inMemoryFlushInProgress flag again for the case this method is invoked 402 // directly (only in tests) in the common path setting from true to true is idempotent 403 inMemoryFlushInProgress.set(true); 404 try { 405 // Phase I: Update the pipeline 406 getRegionServices().blockUpdates(); 407 try { 408 LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline"); 409 pushActiveToPipeline(this.active); 410 } finally { 411 getRegionServices().unblockUpdates(); 412 } 413 414 // Used by tests 415 if (!allowCompaction.get()) { 416 return; 417 } 418 // Phase II: Compact the pipeline 419 try { 420 // Speculative compaction execution, may be interrupted if flush is forced while 421 // compaction is in progress 422 compactor.start(); 423 } catch (IOException e) { 424 LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}", 425 getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e); 426 } 427 } finally { 428 inMemoryFlushInProgress.set(false); 429 LOG.trace("IN-MEMORY FLUSH: end"); 430 } 431 } 432 433 private Segment getLastSegment() { 434 Segment localActive = getActive(); 435 Segment tail = pipeline.getTail(); 436 return tail == null ? localActive : tail; 437 } 438 439 private byte[] getFamilyNameInBytes() { 440 return store.getColumnFamilyDescriptor().getName(); 441 } 442 443 private ThreadPoolExecutor getPool() { 444 return getRegionServices().getInMemoryCompactionPool(); 445 } 446 447 @VisibleForTesting 448 protected boolean shouldFlushInMemory() { 449 if (this.active.getDataSize() > inmemoryFlushSize) { // size above flush threshold 450 if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush 451 return false; // regardless the size 452 } 453 // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude 454 // the insert of the active into the compaction pipeline 455 return (inMemoryFlushInProgress.compareAndSet(false,true)); 456 } 457 return false; 458 } 459 460 /** 461 * The request to cancel the compaction asynchronous task (caused by in-memory flush) 462 * The compaction may still happen if the request was sent too late 463 * Non-blocking request 464 */ 465 private void stopCompaction() { 466 if (inMemoryFlushInProgress.get()) { 467 compactor.stop(); 468 } 469 } 470 471 protected void pushActiveToPipeline(MutableSegment active) { 472 if (!active.isEmpty()) { 473 pipeline.pushHead(active); 474 resetActive(); 475 } 476 } 477 478 private void pushTailToSnapshot() { 479 VersionedSegmentsList segments = pipeline.getVersionedTail(); 480 pushToSnapshot(segments.getStoreSegments()); 481 // In Swap: don't close segments (they are in snapshot now) and don't update the region size 482 pipeline.swap(segments,null,false, false); 483 } 484 485 private void pushPipelineToSnapshot() { 486 int iterationsCnt = 0; 487 boolean done = false; 488 while (!done) { 489 iterationsCnt++; 490 VersionedSegmentsList segments = pipeline.getVersionedList(); 491 pushToSnapshot(segments.getStoreSegments()); 492 // swap can return false in case the pipeline was updated by ongoing compaction 493 // and the version increase, the chance of it happenning is very low 494 // In Swap: don't close segments (they are in snapshot now) and don't update the region size 495 done = pipeline.swap(segments, null, false, false); 496 if (iterationsCnt>2) { 497 // practically it is impossible that this loop iterates more than two times 498 // (because the compaction is stopped and none restarts it while in snapshot request), 499 // however stopping here for the case of the infinite loop causing by any error 500 LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," + 501 " while flushing to disk."); 502 this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator()); 503 break; 504 } 505 } 506 } 507 508 private void pushToSnapshot(List<ImmutableSegment> segments) { 509 if(segments.isEmpty()) return; 510 if(segments.size() == 1 && !segments.get(0).isEmpty()) { 511 this.snapshot = segments.get(0); 512 return; 513 } else { // create composite snapshot 514 this.snapshot = 515 SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments); 516 } 517 } 518 519 private RegionServicesForStores getRegionServices() { 520 return regionServices; 521 } 522 523 /** 524 * The in-memory-flusher thread performs the flush asynchronously. 525 * There is at most one thread per memstore instance. 526 * It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock 527 * and compacts the pipeline. 528 */ 529 private class InMemoryFlushRunnable implements Runnable { 530 531 @Override 532 public void run() { 533 try { 534 flushInMemory(); 535 } catch (IOException e) { 536 LOG.warn("Unable to run memstore compaction. region " 537 + getRegionServices().getRegionInfo().getRegionNameAsString() 538 + "store: "+ getFamilyName(), e); 539 } 540 } 541 } 542 543 @VisibleForTesting 544 boolean isMemStoreFlushingInMemory() { 545 return inMemoryFlushInProgress.get(); 546 } 547 548 /** 549 * @param cell Find the row that comes after this one. If null, we return the 550 * first. 551 * @return Next row or null if none found. 552 */ 553 Cell getNextRow(final Cell cell) { 554 Cell lowest = null; 555 List<Segment> segments = getSegments(); 556 for (Segment segment : segments) { 557 if (lowest == null) { 558 lowest = getNextRow(cell, segment.getCellSet()); 559 } else { 560 lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet())); 561 } 562 } 563 return lowest; 564 } 565 566 @VisibleForTesting 567 long getInmemoryFlushSize() { 568 return inmemoryFlushSize; 569 } 570 571 // debug method 572 public void debug() { 573 String msg = "active size=" + this.active.getDataSize(); 574 msg += " in-memory flush size is "+ inmemoryFlushSize; 575 msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false"); 576 msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false"); 577 LOG.debug(msg); 578 } 579 580}