001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.context.Scope; 022import java.io.IOException; 023import java.lang.Thread.UncaughtExceptionHandler; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.ConcurrentModificationException; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.SortedMap; 034import java.util.concurrent.BlockingQueue; 035import java.util.concurrent.DelayQueue; 036import java.util.concurrent.Delayed; 037import java.util.concurrent.ThreadFactory; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.LongAdder; 042import java.util.concurrent.locks.ReentrantReadWriteLock; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.DroppedSnapshotException; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.client.RegionReplicaUtil; 047import org.apache.hadoop.hbase.conf.ConfigurationObserver; 048import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 049import org.apache.hadoop.hbase.trace.TraceUtil; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 053import org.apache.hadoop.hbase.util.Threads; 054import org.apache.hadoop.ipc.RemoteException; 055import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062/** 063 * Thread that flushes cache on request NOTE: This class extends Thread rather than Chore because 064 * the sleep time can be interrupted when there is something to do, rather than the Chore sleep time 065 * which is invariant. 066 * @see FlushRequester 067 */ 068@InterfaceAudience.Private 069public class MemStoreFlusher implements FlushRequester, ConfigurationObserver { 070 private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class); 071 072 private Configuration conf; 073 // These two data members go together. Any entry in the one must have 074 // a corresponding entry in the other. 075 private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>(); 076 protected final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>(); 077 private AtomicBoolean wakeupPending = new AtomicBoolean(); 078 079 private final long threadWakeFrequency; 080 private final HRegionServer server; 081 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 082 private final Object blockSignal = new Object(); 083 084 private long blockingWaitTime; 085 private final LongAdder updatesBlockedMsHighWater = new LongAdder(); 086 087 private FlushHandler[] flushHandlers; 088 089 private final AtomicInteger flusherIdGen = new AtomicInteger(); 090 091 private ThreadFactory flusherThreadFactory; 092 093 private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1); 094 095 /** 096 * Singleton instance inserted into flush queue used for signaling. 097 */ 098 private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() { 099 @Override 100 public long getDelay(TimeUnit unit) { 101 return 0; 102 } 103 104 @Override 105 public int compareTo(Delayed o) { 106 return -1; 107 } 108 109 @Override 110 public boolean equals(Object obj) { 111 return obj == this; 112 } 113 114 @Override 115 public int hashCode() { 116 return 42; 117 } 118 }; 119 120 /** 121 * */ 122 public MemStoreFlusher(final Configuration conf, final HRegionServer server) { 123 super(); 124 this.conf = conf; 125 this.server = server; 126 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 127 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); 128 int handlerCount = 0; 129 if (server != null) { 130 handlerCount = getHandlerCount(conf); 131 LOG.info("globalMemStoreLimit=" 132 + TraditionalBinaryPrefix 133 .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) 134 + ", globalMemStoreLimitLowMark=" 135 + TraditionalBinaryPrefix.long2String( 136 this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) 137 + ", Offheap=" + (this.server.getRegionServerAccounting().isOffheap())); 138 } 139 this.flushHandlers = new FlushHandler[handlerCount]; 140 } 141 142 public LongAdder getUpdatesBlockedMsHighWater() { 143 return this.updatesBlockedMsHighWater; 144 } 145 146 /** 147 * The memstore across all regions has exceeded the low water mark. Pick one region to flush and 148 * flush it synchronously (this is called from the flush thread) 149 * @return true if successful 150 */ 151 private boolean flushOneForGlobalPressure(FlushType flushType) { 152 SortedMap<Long, Collection<HRegion>> regionsBySize = null; 153 switch (flushType) { 154 case ABOVE_OFFHEAP_HIGHER_MARK: 155 case ABOVE_OFFHEAP_LOWER_MARK: 156 regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize(); 157 break; 158 case ABOVE_ONHEAP_HIGHER_MARK: 159 case ABOVE_ONHEAP_LOWER_MARK: 160 default: 161 regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize(); 162 } 163 Set<HRegion> excludedRegions = new HashSet<>(); 164 165 double secondaryMultiplier = 166 ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf); 167 168 boolean flushedOne = false; 169 while (!flushedOne) { 170 // Find the biggest region that doesn't have too many storefiles (might be null!) 171 HRegion bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true); 172 // Find the biggest region, total, even if it might have too many flushes. 173 HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false); 174 // Find the biggest region that is a secondary region 175 HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions); 176 if (bestAnyRegion == null) { 177 // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null 178 bestAnyRegion = bestRegionReplica; 179 } 180 if (bestAnyRegion == null) { 181 LOG.error("Above memory mark but there are no flushable regions!"); 182 return false; 183 } 184 185 HRegion regionToFlush; 186 long bestAnyRegionSize; 187 long bestFlushableRegionSize; 188 switch (flushType) { 189 case ABOVE_OFFHEAP_HIGHER_MARK: 190 case ABOVE_OFFHEAP_LOWER_MARK: 191 bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize(); 192 bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion); 193 break; 194 195 case ABOVE_ONHEAP_HIGHER_MARK: 196 case ABOVE_ONHEAP_LOWER_MARK: 197 bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize(); 198 bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion); 199 break; 200 201 default: 202 bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize(); 203 bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion); 204 } 205 if (bestAnyRegionSize > 2 * bestFlushableRegionSize) { 206 // Even if it's not supposed to be flushed, pick a region if it's more than twice 207 // as big as the best flushable one - otherwise when we're under pressure we make 208 // lots of little flushes and cause lots of compactions, etc, which just makes 209 // life worse! 210 if (LOG.isDebugEnabled()) { 211 LOG.debug("Under global heap pressure: " + "Region " 212 + bestAnyRegion.getRegionInfo().getRegionNameAsString() + " has too many " 213 + "store files, but is " + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1) 214 + " vs best flushable region's " 215 + TraditionalBinaryPrefix.long2String(bestFlushableRegionSize, "", 1) 216 + ". Choosing the bigger."); 217 } 218 regionToFlush = bestAnyRegion; 219 } else { 220 if (bestFlushableRegion == null) { 221 regionToFlush = bestAnyRegion; 222 } else { 223 regionToFlush = bestFlushableRegion; 224 } 225 } 226 227 long regionToFlushSize; 228 long bestRegionReplicaSize; 229 switch (flushType) { 230 case ABOVE_OFFHEAP_HIGHER_MARK: 231 case ABOVE_OFFHEAP_LOWER_MARK: 232 regionToFlushSize = regionToFlush.getMemStoreOffHeapSize(); 233 bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica); 234 break; 235 236 case ABOVE_ONHEAP_HIGHER_MARK: 237 case ABOVE_ONHEAP_LOWER_MARK: 238 regionToFlushSize = regionToFlush.getMemStoreHeapSize(); 239 bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica); 240 break; 241 242 default: 243 regionToFlushSize = regionToFlush.getMemStoreDataSize(); 244 bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica); 245 } 246 247 if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) { 248 // A concurrency issue (such as splitting region) may happen such that the online region 249 // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to 250 // getBiggestMemStoreRegion(). This means that we can come out of the loop 251 LOG.debug("Above memory mark but there is no flushable region"); 252 return false; 253 } 254 255 if ( 256 regionToFlush == null || (bestRegionReplica != null 257 && ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) 258 && (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize)) 259 ) { 260 LOG.info("Refreshing storefiles of region " + bestRegionReplica 261 + " due to global heap pressure. Total memstore off heap size=" 262 + TraditionalBinaryPrefix 263 .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) 264 + " memstore heap size=" + TraditionalBinaryPrefix 265 .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)); 266 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); 267 if (!flushedOne) { 268 LOG.info("Excluding secondary region " + bestRegionReplica 269 + " - trying to find a different region to refresh files."); 270 excludedRegions.add(bestRegionReplica); 271 } 272 } else { 273 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " 274 + "Flush type=" + flushType.toString() + ", Total Memstore Heap size=" 275 + TraditionalBinaryPrefix 276 .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) 277 + ", Total Memstore Off-Heap size=" 278 + TraditionalBinaryPrefix 279 .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) 280 + ", Region memstore size=" 281 + TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1)); 282 flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY); 283 284 if (!flushedOne) { 285 LOG.info("Excluding unflushable region " + regionToFlush 286 + " - trying to find a different region to flush."); 287 excludedRegions.add(regionToFlush); 288 } 289 } 290 } 291 return true; 292 } 293 294 /** Returns Return memstore offheap size or null if <code>r</code> is null */ 295 private static long getMemStoreOffHeapSize(HRegion r) { 296 return r == null ? 0 : r.getMemStoreOffHeapSize(); 297 } 298 299 /** Returns Return memstore heap size or null if <code>r</code> is null */ 300 private static long getMemStoreHeapSize(HRegion r) { 301 return r == null ? 0 : r.getMemStoreHeapSize(); 302 } 303 304 /** Returns Return memstore data size or null if <code>r</code> is null */ 305 private static long getMemStoreDataSize(HRegion r) { 306 return r == null ? 0 : r.getMemStoreDataSize(); 307 } 308 309 private class FlushHandler extends Thread { 310 311 private final AtomicBoolean running = new AtomicBoolean(true); 312 313 private FlushHandler(String name) { 314 super(name); 315 } 316 317 @Override 318 public void run() { 319 while (!server.isStopped() && running.get()) { 320 FlushQueueEntry fqe = null; 321 try { 322 wakeupPending.set(false); // allow someone to wake us up again 323 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); 324 if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) { 325 FlushType type = isAboveLowWaterMark(); 326 if (type != FlushType.NORMAL) { 327 LOG.debug("Flush thread woke up because memory above low water=" 328 + TraditionalBinaryPrefix.long2String( 329 server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)); 330 // For offheap memstore, even if the lower water mark was breached due to heap 331 // overhead 332 // we still select the regions based on the region's memstore data size. 333 // TODO : If we want to decide based on heap over head it can be done without tracking 334 // it per region. 335 if (!flushOneForGlobalPressure(type)) { 336 // Wasn't able to flush any region, but we're above low water mark 337 // This is unlikely to happen, but might happen when closing the 338 // entire server - another thread is flushing regions. We'll just 339 // sleep a little bit to avoid spinning, and then pretend that 340 // we flushed one, so anyone blocked will check again 341 Thread.sleep(1000); 342 wakeUpIfBlocking(); 343 } 344 // Enqueue another one of these tokens so we'll wake up again 345 wakeupFlushThread(); 346 } 347 continue; 348 } 349 FlushRegionEntry fre = (FlushRegionEntry) fqe; 350 if (!flushRegion(fre)) { 351 break; 352 } 353 } catch (InterruptedException ex) { 354 continue; 355 } catch (ConcurrentModificationException ex) { 356 continue; 357 } catch (Exception ex) { 358 LOG.error("Cache flusher failed for entry " + fqe, ex); 359 if (!server.checkFileSystem()) { 360 break; 361 } 362 } 363 } 364 365 if (server.isStopped()) { 366 synchronized (regionsInQueue) { 367 regionsInQueue.clear(); 368 flushQueue.clear(); 369 } 370 371 // Signal anyone waiting, so they see the close flag 372 wakeUpIfBlocking(); 373 } 374 LOG.info(getName() + " exiting"); 375 } 376 377 public void shutdown() { 378 if (!running.compareAndSet(true, false)) { 379 LOG.warn("{} is already signaled to shutdown", getName()); 380 } 381 } 382 } 383 384 private void wakeupFlushThread() { 385 if (wakeupPending.compareAndSet(false, true)) { 386 flushQueue.add(WAKEUPFLUSH_INSTANCE); 387 } 388 } 389 390 private HRegion getBiggestMemStoreRegion(SortedMap<Long, Collection<HRegion>> regionsBySize, 391 Set<HRegion> excludedRegions, boolean checkStoreFileCount) { 392 synchronized (regionsInQueue) { 393 for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) { 394 for (HRegion region : entry.getValue()) { 395 if (excludedRegions.contains(region)) { 396 continue; 397 } 398 399 if (region.writestate.flushing || !region.writestate.writesEnabled) { 400 continue; 401 } 402 403 if (checkStoreFileCount && isTooManyStoreFiles(region)) { 404 continue; 405 } 406 return region; 407 } 408 } 409 } 410 return null; 411 } 412 413 private HRegion getBiggestMemStoreOfRegionReplica( 414 SortedMap<Long, Collection<HRegion>> regionsBySize, Set<HRegion> excludedRegions) { 415 synchronized (regionsInQueue) { 416 for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) { 417 for (HRegion region : entry.getValue()) { 418 if (excludedRegions.contains(region)) { 419 continue; 420 } 421 422 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 423 continue; 424 } 425 return region; 426 } 427 } 428 } 429 return null; 430 } 431 432 private boolean refreshStoreFilesAndReclaimMemory(Region region) { 433 try { 434 return region.refreshStoreFiles(); 435 } catch (IOException e) { 436 LOG.warn("Refreshing store files failed with exception", e); 437 } 438 return false; 439 } 440 441 /** 442 * Return the FlushType if global memory usage is above the high watermark 443 */ 444 private FlushType isAboveHighWaterMark() { 445 return server.getRegionServerAccounting().isAboveHighWaterMark(); 446 } 447 448 /** 449 * Return the FlushType if we're above the low watermark 450 */ 451 private FlushType isAboveLowWaterMark() { 452 return server.getRegionServerAccounting().isAboveLowWaterMark(); 453 } 454 455 @Override 456 public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) { 457 return this.requestFlush(r, null, tracker); 458 } 459 460 @Override 461 public boolean requestFlush(HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) { 462 synchronized (regionsInQueue) { 463 FlushRegionEntry existFqe = regionsInQueue.get(r); 464 if (existFqe != null) { 465 // if a delayed one exists and not reach the time to execute, just remove it 466 if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) { 467 LOG.info("Remove the existing delayed flush entry for {}, " 468 + "because we need to flush it immediately", r); 469 this.regionsInQueue.remove(r); 470 this.flushQueue.remove(existFqe); 471 r.decrementFlushesQueuedCount(); 472 } else { 473 tracker.notExecuted("Flush already requested on " + r); 474 return false; 475 } 476 } 477 478 // This entry has no delay so it will be added at the top of the flush 479 // queue. It'll come out near immediately. 480 FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker); 481 this.regionsInQueue.put(r, fqe); 482 this.flushQueue.add(fqe); 483 r.incrementFlushesQueuedCount(); 484 return true; 485 } 486 } 487 488 @Override 489 public boolean requestDelayedFlush(HRegion r, long delay) { 490 synchronized (regionsInQueue) { 491 if (!regionsInQueue.containsKey(r)) { 492 // This entry has some delay 493 FlushRegionEntry fqe = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY); 494 fqe.requeue(delay); 495 this.regionsInQueue.put(r, fqe); 496 this.flushQueue.add(fqe); 497 r.incrementFlushesQueuedCount(); 498 return true; 499 } 500 return false; 501 } 502 } 503 504 public int getFlushQueueSize() { 505 return flushQueue.size(); 506 } 507 508 /** 509 * Only interrupt once it's done with a run through the work loop. 510 */ 511 void interruptIfNecessary() { 512 lock.writeLock().lock(); 513 try { 514 for (FlushHandler flushHandler : flushHandlers) { 515 if (flushHandler != null) { 516 flushHandler.interrupt(); 517 } 518 } 519 } finally { 520 lock.writeLock().unlock(); 521 } 522 } 523 524 synchronized void start(UncaughtExceptionHandler eh) { 525 this.flusherThreadFactory = 526 new ThreadFactoryBuilder().setDaemon(true).setUncaughtExceptionHandler(eh).build(); 527 lock.readLock().lock(); 528 try { 529 startFlushHandlerThreads(flushHandlers, 0, flushHandlers.length); 530 } finally { 531 lock.readLock().unlock(); 532 } 533 } 534 535 boolean isAlive() { 536 lock.readLock().lock(); 537 try { 538 for (FlushHandler flushHandler : flushHandlers) { 539 if (flushHandler != null && flushHandler.isAlive()) { 540 return true; 541 } 542 } 543 return false; 544 } finally { 545 lock.readLock().unlock(); 546 } 547 } 548 549 void shutdown() { 550 lock.readLock().lock(); 551 try { 552 for (FlushHandler flushHandler : flushHandlers) { 553 if (flushHandler != null) { 554 Threads.shutdown(flushHandler); 555 } 556 } 557 } finally { 558 lock.readLock().unlock(); 559 } 560 } 561 562 /** 563 * A flushRegion that checks store file count. If too many, puts the flush on delay queue to retry 564 * later. 565 * @return true if the region was successfully flushed, false otherwise. If false, there will be 566 * accompanying log messages explaining why the region was not flushed. 567 */ 568 private boolean flushRegion(final FlushRegionEntry fqe) { 569 HRegion region = fqe.region; 570 if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { 571 if (fqe.isMaximumWait(this.blockingWaitTime)) { 572 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) 573 + "ms on a compaction to clean up 'too many store files'; waited " 574 + "long enough... proceeding with flush of " 575 + region.getRegionInfo().getRegionNameAsString()); 576 } else { 577 // If this is first time we've been put off, then emit a log message. 578 if (fqe.getRequeueCount() <= 0) { 579 // Note: We don't impose blockingStoreFiles constraint on meta regions 580 LOG.warn("{} has too many store files({}); delaying flush up to {} ms", 581 region.getRegionInfo().getEncodedName(), getStoreFileCount(region), 582 this.blockingWaitTime); 583 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 584 if (!compactSplitThread.requestSplit(region)) { 585 try { 586 compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 587 } catch (IOException e) { 588 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 589 LOG.error("Cache flush failed for region " 590 + Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e); 591 } 592 } 593 } 594 595 // Put back on the queue. Have it come back out of the queue 596 // after a delay of this.blockingWaitTime / 100 ms. 597 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); 598 // Tell a lie, it's not flushed but it's ok 599 return true; 600 } 601 } 602 return flushRegion(region, false, fqe.families, fqe.getTracker()); 603 } 604 605 /** 606 * Flush a region. 607 * @param region Region to flush. 608 * @param emergencyFlush Set if we are being force flushed. If true the region needs to be removed 609 * from the flush queue. If false, when we were called from the main flusher 610 * run loop and we got the entry to flush by calling poll on the flush queue 611 * (which removed it). 612 * @param families stores of region to flush. 613 * @return true if the region was successfully flushed, false otherwise. If false, there will be 614 * accompanying log messages explaining why the region was not flushed. 615 */ 616 private boolean flushRegion(HRegion region, boolean emergencyFlush, List<byte[]> families, 617 FlushLifeCycleTracker tracker) { 618 synchronized (this.regionsInQueue) { 619 FlushRegionEntry fqe = this.regionsInQueue.remove(region); 620 // Use the start time of the FlushRegionEntry if available 621 if (fqe != null && emergencyFlush) { 622 // Need to remove from region from delay queue. When NOT an 623 // emergencyFlush, then item was removed via a flushQueue.poll. 624 flushQueue.remove(fqe); 625 } 626 } 627 628 tracker.beforeExecution(); 629 lock.readLock().lock(); 630 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 631 try { 632 notifyFlushRequest(region, emergencyFlush); 633 FlushResult flushResult = region.flushcache(families, false, tracker); 634 boolean shouldCompact = flushResult.isCompactionNeeded(); 635 // We just want to check the size 636 boolean shouldSplit = region.checkSplit().isPresent(); 637 if (shouldSplit) { 638 compactSplitThread.requestSplit(region); 639 } else if (shouldCompact) { 640 compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 641 } 642 } catch (DroppedSnapshotException ex) { 643 // Cache flush can fail in a few places. If it fails in a critical 644 // section, we get a DroppedSnapshotException and a replay of wal 645 // is required. Currently the only way to do this is a restart of 646 // the server. Abort because hdfs is probably bad (HBASE-644 is a case 647 // where hdfs was bad but passed the hdfs check). 648 server.abort("Replay of WAL required. Forcing server shutdown", ex); 649 return false; 650 } catch (IOException ex) { 651 ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 652 LOG.error("Cache flush failed" + (region != null 653 ? (" for region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName())) 654 : ""), ex); 655 if (!server.checkFileSystem()) { 656 return false; 657 } 658 } finally { 659 lock.readLock().unlock(); 660 wakeUpIfBlocking(); 661 tracker.afterExecution(); 662 } 663 return true; 664 } 665 666 private void notifyFlushRequest(Region region, boolean emergencyFlush) { 667 FlushType type = null; 668 if (emergencyFlush) { 669 type = isAboveHighWaterMark(); 670 } 671 if (type == null) { 672 type = isAboveLowWaterMark(); 673 } 674 for (FlushRequestListener listener : flushRequestListeners) { 675 listener.flushRequested(type, region); 676 } 677 } 678 679 private void wakeUpIfBlocking() { 680 synchronized (blockSignal) { 681 blockSignal.notifyAll(); 682 } 683 } 684 685 private boolean isTooManyStoreFiles(Region region) { 686 687 // When compaction is disabled, the region is flushable 688 if (!region.getTableDescriptor().isCompactionEnabled()) { 689 return false; 690 } 691 692 for (Store store : region.getStores()) { 693 if (store.hasTooManyStoreFiles()) { 694 return true; 695 } 696 } 697 return false; 698 } 699 700 private int getStoreFileCount(Region region) { 701 int count = 0; 702 for (Store store : region.getStores()) { 703 count += store.getStorefilesCount(); 704 } 705 return count; 706 } 707 708 /** 709 * Check if the regionserver's memstore memory usage is greater than the limit. If so, flush 710 * regions with the biggest memstores until we're down to the lower limit. This method blocks 711 * callers until we're down to a safe amount of memstore consumption. 712 */ 713 public void reclaimMemStoreMemory() { 714 Span span = 715 TraceUtil.getGlobalTracer().spanBuilder("MemStoreFluser.reclaimMemStoreMemory").startSpan(); 716 try (Scope scope = span.makeCurrent()) { 717 FlushType flushType = isAboveHighWaterMark(); 718 if (flushType != FlushType.NORMAL) { 719 span.addEvent("Force Flush. We're above high water mark."); 720 long start = EnvironmentEdgeManager.currentTime(); 721 long nextLogTimeMs = start; 722 synchronized (this.blockSignal) { 723 boolean blocked = false; 724 long startTime = 0; 725 boolean interrupted = false; 726 try { 727 flushType = isAboveHighWaterMark(); 728 while (flushType != FlushType.NORMAL && !server.isStopped()) { 729 if (!blocked) { 730 startTime = EnvironmentEdgeManager.currentTime(); 731 if (!server.getRegionServerAccounting().isOffheap()) { 732 logMsg("global memstore heapsize", 733 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 734 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 735 } else { 736 switch (flushType) { 737 case ABOVE_OFFHEAP_HIGHER_MARK: 738 logMsg("the global offheap memstore datasize", 739 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), 740 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 741 break; 742 case ABOVE_ONHEAP_HIGHER_MARK: 743 logMsg("global memstore heapsize", 744 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 745 server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit()); 746 break; 747 default: 748 break; 749 } 750 } 751 } 752 blocked = true; 753 wakeupFlushThread(); 754 try { 755 // we should be able to wait forever, but we've seen a bug where 756 // we miss a notify, so put a 5 second bound on it at least. 757 blockSignal.wait(5 * 1000); 758 } catch (InterruptedException ie) { 759 LOG.warn("Interrupted while waiting"); 760 interrupted = true; 761 } 762 long nowMs = EnvironmentEdgeManager.currentTime(); 763 if (nowMs >= nextLogTimeMs) { 764 LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start); 765 nextLogTimeMs = nowMs + 1000; 766 } 767 flushType = isAboveHighWaterMark(); 768 } 769 } finally { 770 if (interrupted) { 771 Thread.currentThread().interrupt(); 772 } 773 } 774 775 if (blocked) { 776 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; 777 if (totalTime > 0) { 778 this.updatesBlockedMsHighWater.add(totalTime); 779 } 780 LOG.info("Unblocking updates for server " + server.toString()); 781 } 782 } 783 } else { 784 flushType = isAboveLowWaterMark(); 785 if (flushType != FlushType.NORMAL) { 786 wakeupFlushThread(); 787 } 788 span.end(); 789 } 790 } 791 } 792 793 private void logMsg(String type, long val, long max) { 794 LOG.info("Blocking updates: {} {} is >= blocking {}", type, 795 TraditionalBinaryPrefix.long2String(val, "", 1), 796 TraditionalBinaryPrefix.long2String(max, "", 1)); 797 } 798 799 @Override 800 public String toString() { 801 return "flush_queue=" + flushQueue.size(); 802 } 803 804 public String dumpQueue() { 805 StringBuilder queueList = new StringBuilder(); 806 queueList.append("Flush Queue Queue dump:\n"); 807 queueList.append(" Flush Queue:\n"); 808 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator(); 809 810 while (it.hasNext()) { 811 queueList.append(" " + it.next().toString()); 812 queueList.append("\n"); 813 } 814 815 return queueList.toString(); 816 } 817 818 /** 819 * Register a MemstoreFlushListener 820 */ 821 @Override 822 public void registerFlushRequestListener(final FlushRequestListener listener) { 823 this.flushRequestListeners.add(listener); 824 } 825 826 /** 827 * Unregister the listener from MemstoreFlushListeners 828 * @return true when passed listener is unregistered successfully. 829 */ 830 @Override 831 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) { 832 return this.flushRequestListeners.remove(listener); 833 } 834 835 /** 836 * Sets the global memstore limit to a new size. 837 */ 838 @Override 839 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 840 this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize); 841 reclaimMemStoreMemory(); 842 } 843 844 interface FlushQueueEntry extends Delayed { 845 } 846 847 /** 848 * Datastructure used in the flush queue. Holds region and retry count. Keeps tabs on how old this 849 * object is. Implements {@link Delayed}. On construction, the delay is zero. When added to a 850 * delay queue, we'll come out near immediately. Call {@link #requeue(long)} passing delay in 851 * milliseconds before readding to delay queue if you want it to stay there a while. 852 */ 853 static class FlushRegionEntry implements FlushQueueEntry { 854 private final HRegion region; 855 856 private final long createTime; 857 private long whenToExpire; 858 private int requeueCount = 0; 859 860 private final List<byte[]> families; 861 862 private final FlushLifeCycleTracker tracker; 863 864 FlushRegionEntry(final HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) { 865 this.region = r; 866 this.createTime = EnvironmentEdgeManager.currentTime(); 867 this.whenToExpire = this.createTime; 868 this.families = families; 869 this.tracker = tracker; 870 } 871 872 /** Returns True if we have been delayed > <code>maximumWait</code> milliseconds. */ 873 public boolean isMaximumWait(final long maximumWait) { 874 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; 875 } 876 877 /** Returns True if the entry is a delay flush task */ 878 protected boolean isDelay() { 879 return this.whenToExpire > this.createTime; 880 } 881 882 /** 883 * @return Count of times {@link #requeue(long)} was called; i.e this is number of times we've 884 * been requeued. 885 */ 886 public int getRequeueCount() { 887 return this.requeueCount; 888 } 889 890 public FlushLifeCycleTracker getTracker() { 891 return tracker; 892 } 893 894 /** 895 * @param when When to expire, when to come up out of the queue. Specify in milliseconds. This 896 * method adds EnvironmentEdgeManager.currentTime() to whatever you pass. 897 * @return This. 898 */ 899 public FlushRegionEntry requeue(final long when) { 900 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when; 901 this.requeueCount++; 902 return this; 903 } 904 905 @Override 906 public long getDelay(TimeUnit unit) { 907 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), 908 TimeUnit.MILLISECONDS); 909 } 910 911 @Override 912 public int compareTo(Delayed other) { 913 // Delay is compared first. If there is a tie, compare region's hash code 914 int ret = 915 Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)) 916 .intValue(); 917 if (ret != 0) { 918 return ret; 919 } 920 FlushQueueEntry otherEntry = (FlushQueueEntry) other; 921 return hashCode() - otherEntry.hashCode(); 922 } 923 924 @Override 925 public String toString() { 926 return "[flush region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()) + "]"; 927 } 928 929 @Override 930 public int hashCode() { 931 int hash = (int) getDelay(TimeUnit.MILLISECONDS); 932 return hash ^ region.hashCode(); 933 } 934 935 @Override 936 public boolean equals(Object obj) { 937 if (this == obj) { 938 return true; 939 } 940 if (obj == null || getClass() != obj.getClass()) { 941 return false; 942 } 943 FlushRegionEntry other = (FlushRegionEntry) obj; 944 if ( 945 !Bytes.equals(this.region.getRegionInfo().getRegionName(), 946 other.region.getRegionInfo().getRegionName()) 947 ) { 948 return false; 949 } 950 return compareTo(other) == 0; 951 } 952 } 953 954 private int getHandlerCount(Configuration conf) { 955 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); 956 if (handlerCount < 1) { 957 LOG.warn( 958 "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1", 959 handlerCount); 960 handlerCount = 1; 961 } 962 return handlerCount; 963 } 964 965 @Override 966 public void onConfigurationChange(Configuration newConf) { 967 int newHandlerCount = getHandlerCount(newConf); 968 if (newHandlerCount != flushHandlers.length) { 969 LOG.info("update hbase.hstore.flusher.count from {} to {}", flushHandlers.length, 970 newHandlerCount); 971 lock.writeLock().lock(); 972 try { 973 FlushHandler[] newFlushHandlers = Arrays.copyOf(flushHandlers, newHandlerCount); 974 if (newHandlerCount > flushHandlers.length) { 975 startFlushHandlerThreads(newFlushHandlers, flushHandlers.length, newFlushHandlers.length); 976 } else { 977 stopFlushHandlerThreads(flushHandlers, newHandlerCount, flushHandlers.length); 978 } 979 flusherIdGen.compareAndSet(flushHandlers.length, newFlushHandlers.length); 980 this.flushHandlers = newFlushHandlers; 981 } finally { 982 lock.writeLock().unlock(); 983 } 984 } 985 } 986 987 private void startFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) { 988 if (flusherThreadFactory != null) { 989 for (int i = start; i < end; i++) { 990 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + flusherIdGen.getAndIncrement()); 991 flusherThreadFactory.newThread(flushHandlers[i]); 992 flushHandlers[i].start(); 993 } 994 } 995 } 996 997 private void stopFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) { 998 for (int i = start; i < end; i++) { 999 flushHandlers[i].shutdown(); 1000 if (LOG.isDebugEnabled()) { 1001 LOG.debug("send shutdown signal to {}", flushHandlers[i].getName()); 1002 } 1003 } 1004 } 1005 1006 public int getFlusherCount() { 1007 return flusherIdGen.get(); 1008 } 1009}