001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.context.Scope; 022import java.io.IOException; 023import java.lang.Thread.UncaughtExceptionHandler; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.ConcurrentModificationException; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.SortedMap; 033import java.util.concurrent.BlockingQueue; 034import java.util.concurrent.DelayQueue; 035import java.util.concurrent.Delayed; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.concurrent.atomic.LongAdder; 040import java.util.concurrent.locks.ReentrantReadWriteLock; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.hbase.DroppedSnapshotException; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.client.RegionReplicaUtil; 045import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 046import org.apache.hadoop.hbase.trace.TraceUtil; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 050import org.apache.hadoop.hbase.util.Threads; 051import org.apache.hadoop.ipc.RemoteException; 052import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 058 059/** 060 * Thread that flushes cache on request NOTE: This class extends Thread rather than Chore because 061 * the sleep time can be interrupted when there is something to do, rather than the Chore sleep time 062 * which is invariant. 063 * @see FlushRequester 064 */ 065@InterfaceAudience.Private 066public class MemStoreFlusher implements FlushRequester { 067 private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class); 068 069 private Configuration conf; 070 // These two data members go together. Any entry in the one must have 071 // a corresponding entry in the other. 072 private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>(); 073 protected final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>(); 074 private AtomicBoolean wakeupPending = new AtomicBoolean(); 075 076 private final long threadWakeFrequency; 077 private final HRegionServer server; 078 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 079 private final Object blockSignal = new Object(); 080 081 private long blockingWaitTime; 082 private final LongAdder updatesBlockedMsHighWater = new LongAdder(); 083 084 private final FlushHandler[] flushHandlers; 085 private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1); 086 087 /** 088 * Singleton instance inserted into flush queue used for signaling. 089 */ 090 private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() { 091 @Override 092 public long getDelay(TimeUnit unit) { 093 return 0; 094 } 095 096 @Override 097 public int compareTo(Delayed o) { 098 return -1; 099 } 100 101 @Override 102 public boolean equals(Object obj) { 103 return obj == this; 104 } 105 106 @Override 107 public int hashCode() { 108 return 42; 109 } 110 }; 111 112 /** 113 * nn 114 */ 115 public MemStoreFlusher(final Configuration conf, final HRegionServer server) { 116 super(); 117 this.conf = conf; 118 this.server = server; 119 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 120 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); 121 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); 122 if (server != null) { 123 if (handlerCount < 1) { 124 LOG.warn( 125 "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1", 126 handlerCount); 127 handlerCount = 1; 128 } 129 LOG.info("globalMemStoreLimit=" 130 + TraditionalBinaryPrefix 131 .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) 132 + ", globalMemStoreLimitLowMark=" 133 + TraditionalBinaryPrefix.long2String( 134 this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) 135 + ", Offheap=" + (this.server.getRegionServerAccounting().isOffheap())); 136 } 137 this.flushHandlers = new FlushHandler[handlerCount]; 138 } 139 140 public LongAdder getUpdatesBlockedMsHighWater() { 141 return this.updatesBlockedMsHighWater; 142 } 143 144 /** 145 * The memstore across all regions has exceeded the low water mark. Pick one region to flush and 146 * flush it synchronously (this is called from the flush thread) 147 * @return true if successful 148 */ 149 private boolean flushOneForGlobalPressure(FlushType flushType) { 150 SortedMap<Long, Collection<HRegion>> regionsBySize = null; 151 switch (flushType) { 152 case ABOVE_OFFHEAP_HIGHER_MARK: 153 case ABOVE_OFFHEAP_LOWER_MARK: 154 regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize(); 155 break; 156 case ABOVE_ONHEAP_HIGHER_MARK: 157 case ABOVE_ONHEAP_LOWER_MARK: 158 default: 159 regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize(); 160 } 161 Set<HRegion> excludedRegions = new HashSet<>(); 162 163 double secondaryMultiplier = 164 ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf); 165 166 boolean flushedOne = false; 167 while (!flushedOne) { 168 // Find the biggest region that doesn't have too many storefiles (might be null!) 169 HRegion bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true); 170 // Find the biggest region, total, even if it might have too many flushes. 171 HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false); 172 // Find the biggest region that is a secondary region 173 HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions); 174 if (bestAnyRegion == null) { 175 // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null 176 bestAnyRegion = bestRegionReplica; 177 } 178 if (bestAnyRegion == null) { 179 LOG.error("Above memory mark but there are no flushable regions!"); 180 return false; 181 } 182 183 HRegion regionToFlush; 184 long bestAnyRegionSize; 185 long bestFlushableRegionSize; 186 switch (flushType) { 187 case ABOVE_OFFHEAP_HIGHER_MARK: 188 case ABOVE_OFFHEAP_LOWER_MARK: 189 bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize(); 190 bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion); 191 break; 192 193 case ABOVE_ONHEAP_HIGHER_MARK: 194 case ABOVE_ONHEAP_LOWER_MARK: 195 bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize(); 196 bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion); 197 break; 198 199 default: 200 bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize(); 201 bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion); 202 } 203 if (bestAnyRegionSize > 2 * bestFlushableRegionSize) { 204 // Even if it's not supposed to be flushed, pick a region if it's more than twice 205 // as big as the best flushable one - otherwise when we're under pressure we make 206 // lots of little flushes and cause lots of compactions, etc, which just makes 207 // life worse! 208 if (LOG.isDebugEnabled()) { 209 LOG.debug("Under global heap pressure: " + "Region " 210 + bestAnyRegion.getRegionInfo().getRegionNameAsString() + " has too many " 211 + "store files, but is " + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1) 212 + " vs best flushable region's " 213 + TraditionalBinaryPrefix.long2String(bestFlushableRegionSize, "", 1) 214 + ". Choosing the bigger."); 215 } 216 regionToFlush = bestAnyRegion; 217 } else { 218 if (bestFlushableRegion == null) { 219 regionToFlush = bestAnyRegion; 220 } else { 221 regionToFlush = bestFlushableRegion; 222 } 223 } 224 225 long regionToFlushSize; 226 long bestRegionReplicaSize; 227 switch (flushType) { 228 case ABOVE_OFFHEAP_HIGHER_MARK: 229 case ABOVE_OFFHEAP_LOWER_MARK: 230 regionToFlushSize = regionToFlush.getMemStoreOffHeapSize(); 231 bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica); 232 break; 233 234 case ABOVE_ONHEAP_HIGHER_MARK: 235 case ABOVE_ONHEAP_LOWER_MARK: 236 regionToFlushSize = regionToFlush.getMemStoreHeapSize(); 237 bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica); 238 break; 239 240 default: 241 regionToFlushSize = regionToFlush.getMemStoreDataSize(); 242 bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica); 243 } 244 245 if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) { 246 // A concurrency issue (such as splitting region) may happen such that the online region 247 // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to 248 // getBiggestMemStoreRegion(). This means that we can come out of the loop 249 LOG.debug("Above memory mark but there is no flushable region"); 250 return false; 251 } 252 253 if ( 254 regionToFlush == null || (bestRegionReplica != null 255 && ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) 256 && (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize)) 257 ) { 258 LOG.info("Refreshing storefiles of region " + bestRegionReplica 259 + " due to global heap pressure. Total memstore off heap size=" 260 + TraditionalBinaryPrefix 261 .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) 262 + " memstore heap size=" + TraditionalBinaryPrefix 263 .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)); 264 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); 265 if (!flushedOne) { 266 LOG.info("Excluding secondary region " + bestRegionReplica 267 + " - trying to find a different region to refresh files."); 268 excludedRegions.add(bestRegionReplica); 269 } 270 } else { 271 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " 272 + "Flush type=" + flushType.toString() + ", Total Memstore Heap size=" 273 + TraditionalBinaryPrefix 274 .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) 275 + ", Total Memstore Off-Heap size=" 276 + TraditionalBinaryPrefix 277 .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) 278 + ", Region memstore size=" 279 + TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1)); 280 flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY); 281 282 if (!flushedOne) { 283 LOG.info("Excluding unflushable region " + regionToFlush 284 + " - trying to find a different region to flush."); 285 excludedRegions.add(regionToFlush); 286 } 287 } 288 } 289 return true; 290 } 291 292 /** 293 * @return Return memstore offheap size or null if <code>r</code> is null 294 */ 295 private static long getMemStoreOffHeapSize(HRegion r) { 296 return r == null ? 0 : r.getMemStoreOffHeapSize(); 297 } 298 299 /** 300 * @return Return memstore heap size or null if <code>r</code> is null 301 */ 302 private static long getMemStoreHeapSize(HRegion r) { 303 return r == null ? 0 : r.getMemStoreHeapSize(); 304 } 305 306 /** 307 * @return Return memstore data size or null if <code>r</code> is null 308 */ 309 private static long getMemStoreDataSize(HRegion r) { 310 return r == null ? 0 : r.getMemStoreDataSize(); 311 } 312 313 private class FlushHandler extends Thread { 314 315 private FlushHandler(String name) { 316 super(name); 317 } 318 319 @Override 320 public void run() { 321 while (!server.isStopped()) { 322 FlushQueueEntry fqe = null; 323 try { 324 wakeupPending.set(false); // allow someone to wake us up again 325 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); 326 if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) { 327 FlushType type = isAboveLowWaterMark(); 328 if (type != FlushType.NORMAL) { 329 LOG.debug("Flush thread woke up because memory above low water=" 330 + TraditionalBinaryPrefix.long2String( 331 server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)); 332 // For offheap memstore, even if the lower water mark was breached due to heap 333 // overhead 334 // we still select the regions based on the region's memstore data size. 335 // TODO : If we want to decide based on heap over head it can be done without tracking 336 // it per region. 337 if (!flushOneForGlobalPressure(type)) { 338 // Wasn't able to flush any region, but we're above low water mark 339 // This is unlikely to happen, but might happen when closing the 340 // entire server - another thread is flushing regions. We'll just 341 // sleep a little bit to avoid spinning, and then pretend that 342 // we flushed one, so anyone blocked will check again 343 Thread.sleep(1000); 344 wakeUpIfBlocking(); 345 } 346 // Enqueue another one of these tokens so we'll wake up again 347 wakeupFlushThread(); 348 } 349 continue; 350 } 351 FlushRegionEntry fre = (FlushRegionEntry) fqe; 352 if (!flushRegion(fre)) { 353 break; 354 } 355 } catch (InterruptedException ex) { 356 continue; 357 } catch (ConcurrentModificationException ex) { 358 continue; 359 } catch (Exception ex) { 360 LOG.error("Cache flusher failed for entry " + fqe, ex); 361 if (!server.checkFileSystem()) { 362 break; 363 } 364 } 365 } 366 synchronized (regionsInQueue) { 367 regionsInQueue.clear(); 368 flushQueue.clear(); 369 } 370 371 // Signal anyone waiting, so they see the close flag 372 wakeUpIfBlocking(); 373 LOG.info(getName() + " exiting"); 374 } 375 } 376 377 private void wakeupFlushThread() { 378 if (wakeupPending.compareAndSet(false, true)) { 379 flushQueue.add(WAKEUPFLUSH_INSTANCE); 380 } 381 } 382 383 private HRegion getBiggestMemStoreRegion(SortedMap<Long, Collection<HRegion>> regionsBySize, 384 Set<HRegion> excludedRegions, boolean checkStoreFileCount) { 385 synchronized (regionsInQueue) { 386 for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) { 387 for (HRegion region : entry.getValue()) { 388 if (excludedRegions.contains(region)) { 389 continue; 390 } 391 392 if (region.writestate.flushing || !region.writestate.writesEnabled) { 393 continue; 394 } 395 396 if (checkStoreFileCount && isTooManyStoreFiles(region)) { 397 continue; 398 } 399 return region; 400 } 401 } 402 } 403 return null; 404 } 405 406 private HRegion getBiggestMemStoreOfRegionReplica( 407 SortedMap<Long, Collection<HRegion>> regionsBySize, Set<HRegion> excludedRegions) { 408 synchronized (regionsInQueue) { 409 for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) { 410 for (HRegion region : entry.getValue()) { 411 if (excludedRegions.contains(region)) { 412 continue; 413 } 414 415 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 416 continue; 417 } 418 return region; 419 } 420 } 421 } 422 return null; 423 } 424 425 private boolean refreshStoreFilesAndReclaimMemory(Region region) { 426 try { 427 return region.refreshStoreFiles(); 428 } catch (IOException e) { 429 LOG.warn("Refreshing store files failed with exception", e); 430 } 431 return false; 432 } 433 434 /** 435 * Return the FlushType if global memory usage is above the high watermark 436 */ 437 private FlushType isAboveHighWaterMark() { 438 return server.getRegionServerAccounting().isAboveHighWaterMark(); 439 } 440 441 /** 442 * Return the FlushType if we're above the low watermark 443 */ 444 private FlushType isAboveLowWaterMark() { 445 return server.getRegionServerAccounting().isAboveLowWaterMark(); 446 } 447 448 @Override 449 public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) { 450 return this.requestFlush(r, null, tracker); 451 } 452 453 @Override 454 public boolean requestFlush(HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) { 455 synchronized (regionsInQueue) { 456 FlushRegionEntry existFqe = regionsInQueue.get(r); 457 if (existFqe != null) { 458 // if a delayed one exists and not reach the time to execute, just remove it 459 if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) { 460 LOG.info("Remove the existing delayed flush entry for {}, " 461 + "because we need to flush it immediately", r); 462 this.regionsInQueue.remove(r); 463 this.flushQueue.remove(existFqe); 464 r.decrementFlushesQueuedCount(); 465 } else { 466 tracker.notExecuted("Flush already requested on " + r); 467 return false; 468 } 469 } 470 471 // This entry has no delay so it will be added at the top of the flush 472 // queue. It'll come out near immediately. 473 FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker); 474 this.regionsInQueue.put(r, fqe); 475 this.flushQueue.add(fqe); 476 r.incrementFlushesQueuedCount(); 477 return true; 478 } 479 } 480 481 @Override 482 public boolean requestDelayedFlush(HRegion r, long delay) { 483 synchronized (regionsInQueue) { 484 if (!regionsInQueue.containsKey(r)) { 485 // This entry has some delay 486 FlushRegionEntry fqe = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY); 487 fqe.requeue(delay); 488 this.regionsInQueue.put(r, fqe); 489 this.flushQueue.add(fqe); 490 r.incrementFlushesQueuedCount(); 491 return true; 492 } 493 return false; 494 } 495 } 496 497 public int getFlushQueueSize() { 498 return flushQueue.size(); 499 } 500 501 /** 502 * Only interrupt once it's done with a run through the work loop. 503 */ 504 void interruptIfNecessary() { 505 lock.writeLock().lock(); 506 try { 507 for (FlushHandler flushHander : flushHandlers) { 508 if (flushHander != null) flushHander.interrupt(); 509 } 510 } finally { 511 lock.writeLock().unlock(); 512 } 513 } 514 515 synchronized void start(UncaughtExceptionHandler eh) { 516 ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder() 517 .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d") 518 .setDaemon(true).setUncaughtExceptionHandler(eh).build(); 519 for (int i = 0; i < flushHandlers.length; i++) { 520 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i); 521 flusherThreadFactory.newThread(flushHandlers[i]); 522 flushHandlers[i].start(); 523 } 524 } 525 526 boolean isAlive() { 527 for (FlushHandler flushHander : flushHandlers) { 528 if (flushHander != null && flushHander.isAlive()) { 529 return true; 530 } 531 } 532 return false; 533 } 534 535 void join() { 536 for (FlushHandler flushHander : flushHandlers) { 537 if (flushHander != null) { 538 Threads.shutdown(flushHander); 539 } 540 } 541 } 542 543 /** 544 * A flushRegion that checks store file count. If too many, puts the flush on delay queue to retry 545 * later. n * @return true if the region was successfully flushed, false otherwise. If false, 546 * there will be accompanying log messages explaining why the region was not flushed. 547 */ 548 private boolean flushRegion(final FlushRegionEntry fqe) { 549 HRegion region = fqe.region; 550 if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { 551 if (fqe.isMaximumWait(this.blockingWaitTime)) { 552 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) 553 + "ms on a compaction to clean up 'too many store files'; waited " 554 + "long enough... proceeding with flush of " 555 + region.getRegionInfo().getRegionNameAsString()); 556 } else { 557 // If this is first time we've been put off, then emit a log message. 558 if (fqe.getRequeueCount() <= 0) { 559 // Note: We don't impose blockingStoreFiles constraint on meta regions 560 LOG.warn("{} has too many store files({}); delaying flush up to {} ms", 561 region.getRegionInfo().getEncodedName(), getStoreFileCount(region), 562 this.blockingWaitTime); 563 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 564 if (!compactSplitThread.requestSplit(region)) { 565 try { 566 compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 567 } catch (IOException e) { 568 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 569 LOG.error("Cache flush failed for region " 570 + Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e); 571 } 572 } 573 } 574 575 // Put back on the queue. Have it come back out of the queue 576 // after a delay of this.blockingWaitTime / 100 ms. 577 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); 578 // Tell a lie, it's not flushed but it's ok 579 return true; 580 } 581 } 582 return flushRegion(region, false, fqe.families, fqe.getTracker()); 583 } 584 585 /** 586 * Flush a region. 587 * @param region Region to flush. 588 * @param emergencyFlush Set if we are being force flushed. If true the region needs to be removed 589 * from the flush queue. If false, when we were called from the main flusher 590 * run loop and we got the entry to flush by calling poll on the flush queue 591 * (which removed it). 592 * @param families stores of region to flush. 593 * @return true if the region was successfully flushed, false otherwise. If false, there will be 594 * accompanying log messages explaining why the region was not flushed. 595 */ 596 private boolean flushRegion(HRegion region, boolean emergencyFlush, List<byte[]> families, 597 FlushLifeCycleTracker tracker) { 598 synchronized (this.regionsInQueue) { 599 FlushRegionEntry fqe = this.regionsInQueue.remove(region); 600 // Use the start time of the FlushRegionEntry if available 601 if (fqe != null && emergencyFlush) { 602 // Need to remove from region from delay queue. When NOT an 603 // emergencyFlush, then item was removed via a flushQueue.poll. 604 flushQueue.remove(fqe); 605 } 606 } 607 608 tracker.beforeExecution(); 609 lock.readLock().lock(); 610 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 611 try { 612 notifyFlushRequest(region, emergencyFlush); 613 FlushResult flushResult = region.flushcache(families, false, tracker); 614 boolean shouldCompact = flushResult.isCompactionNeeded(); 615 // We just want to check the size 616 boolean shouldSplit = region.checkSplit().isPresent(); 617 if (shouldSplit) { 618 compactSplitThread.requestSplit(region); 619 } else if (shouldCompact) { 620 compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 621 } 622 } catch (DroppedSnapshotException ex) { 623 // Cache flush can fail in a few places. If it fails in a critical 624 // section, we get a DroppedSnapshotException and a replay of wal 625 // is required. Currently the only way to do this is a restart of 626 // the server. Abort because hdfs is probably bad (HBASE-644 is a case 627 // where hdfs was bad but passed the hdfs check). 628 server.abort("Replay of WAL required. Forcing server shutdown", ex); 629 return false; 630 } catch (IOException ex) { 631 ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 632 LOG.error("Cache flush failed" + (region != null 633 ? (" for region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName())) 634 : ""), ex); 635 if (!server.checkFileSystem()) { 636 return false; 637 } 638 } finally { 639 lock.readLock().unlock(); 640 wakeUpIfBlocking(); 641 tracker.afterExecution(); 642 } 643 return true; 644 } 645 646 private void notifyFlushRequest(Region region, boolean emergencyFlush) { 647 FlushType type = null; 648 if (emergencyFlush) { 649 type = isAboveHighWaterMark(); 650 } 651 if (type == null) { 652 type = isAboveLowWaterMark(); 653 } 654 for (FlushRequestListener listener : flushRequestListeners) { 655 listener.flushRequested(type, region); 656 } 657 } 658 659 private void wakeUpIfBlocking() { 660 synchronized (blockSignal) { 661 blockSignal.notifyAll(); 662 } 663 } 664 665 private boolean isTooManyStoreFiles(Region region) { 666 667 // When compaction is disabled, the region is flushable 668 if (!region.getTableDescriptor().isCompactionEnabled()) { 669 return false; 670 } 671 672 for (Store store : region.getStores()) { 673 if (store.hasTooManyStoreFiles()) { 674 return true; 675 } 676 } 677 return false; 678 } 679 680 private int getStoreFileCount(Region region) { 681 int count = 0; 682 for (Store store : region.getStores()) { 683 count += store.getStorefilesCount(); 684 } 685 return count; 686 } 687 688 /** 689 * Check if the regionserver's memstore memory usage is greater than the limit. If so, flush 690 * regions with the biggest memstores until we're down to the lower limit. This method blocks 691 * callers until we're down to a safe amount of memstore consumption. 692 */ 693 public void reclaimMemStoreMemory() { 694 Span span = 695 TraceUtil.getGlobalTracer().spanBuilder("MemStoreFluser.reclaimMemStoreMemory").startSpan(); 696 try (Scope scope = span.makeCurrent()) { 697 FlushType flushType = isAboveHighWaterMark(); 698 if (flushType != FlushType.NORMAL) { 699 span.addEvent("Force Flush. We're above high water mark."); 700 long start = EnvironmentEdgeManager.currentTime(); 701 long nextLogTimeMs = start; 702 synchronized (this.blockSignal) { 703 boolean blocked = false; 704 long startTime = 0; 705 boolean interrupted = false; 706 try { 707 flushType = isAboveHighWaterMark(); 708 while (flushType != FlushType.NORMAL && !server.isStopped()) { 709 if (!blocked) { 710 startTime = EnvironmentEdgeManager.currentTime(); 711 if (!server.getRegionServerAccounting().isOffheap()) { 712 logMsg("global memstore heapsize", 713 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 714 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 715 } else { 716 switch (flushType) { 717 case ABOVE_OFFHEAP_HIGHER_MARK: 718 logMsg("the global offheap memstore datasize", 719 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), 720 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 721 break; 722 case ABOVE_ONHEAP_HIGHER_MARK: 723 logMsg("global memstore heapsize", 724 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 725 server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit()); 726 break; 727 default: 728 break; 729 } 730 } 731 } 732 blocked = true; 733 wakeupFlushThread(); 734 try { 735 // we should be able to wait forever, but we've seen a bug where 736 // we miss a notify, so put a 5 second bound on it at least. 737 blockSignal.wait(5 * 1000); 738 } catch (InterruptedException ie) { 739 LOG.warn("Interrupted while waiting"); 740 interrupted = true; 741 } 742 long nowMs = EnvironmentEdgeManager.currentTime(); 743 if (nowMs >= nextLogTimeMs) { 744 LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start); 745 nextLogTimeMs = nowMs + 1000; 746 } 747 flushType = isAboveHighWaterMark(); 748 } 749 } finally { 750 if (interrupted) { 751 Thread.currentThread().interrupt(); 752 } 753 } 754 755 if (blocked) { 756 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; 757 if (totalTime > 0) { 758 this.updatesBlockedMsHighWater.add(totalTime); 759 } 760 LOG.info("Unblocking updates for server " + server.toString()); 761 } 762 } 763 } else { 764 flushType = isAboveLowWaterMark(); 765 if (flushType != FlushType.NORMAL) { 766 wakeupFlushThread(); 767 } 768 span.end(); 769 } 770 } 771 } 772 773 private void logMsg(String type, long val, long max) { 774 LOG.info("Blocking updates: {} {} is >= blocking {}", type, 775 TraditionalBinaryPrefix.long2String(val, "", 1), 776 TraditionalBinaryPrefix.long2String(max, "", 1)); 777 } 778 779 @Override 780 public String toString() { 781 return "flush_queue=" + flushQueue.size(); 782 } 783 784 public String dumpQueue() { 785 StringBuilder queueList = new StringBuilder(); 786 queueList.append("Flush Queue Queue dump:\n"); 787 queueList.append(" Flush Queue:\n"); 788 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator(); 789 790 while (it.hasNext()) { 791 queueList.append(" " + it.next().toString()); 792 queueList.append("\n"); 793 } 794 795 return queueList.toString(); 796 } 797 798 /** 799 * Register a MemstoreFlushListener n 800 */ 801 @Override 802 public void registerFlushRequestListener(final FlushRequestListener listener) { 803 this.flushRequestListeners.add(listener); 804 } 805 806 /** 807 * Unregister the listener from MemstoreFlushListeners n * @return true when passed listener is 808 * unregistered successfully. 809 */ 810 @Override 811 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) { 812 return this.flushRequestListeners.remove(listener); 813 } 814 815 /** 816 * Sets the global memstore limit to a new size. n 817 */ 818 @Override 819 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 820 this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize); 821 reclaimMemStoreMemory(); 822 } 823 824 interface FlushQueueEntry extends Delayed { 825 } 826 827 /** 828 * Datastructure used in the flush queue. Holds region and retry count. Keeps tabs on how old this 829 * object is. Implements {@link Delayed}. On construction, the delay is zero. When added to a 830 * delay queue, we'll come out near immediately. Call {@link #requeue(long)} passing delay in 831 * milliseconds before readding to delay queue if you want it to stay there a while. 832 */ 833 static class FlushRegionEntry implements FlushQueueEntry { 834 private final HRegion region; 835 836 private final long createTime; 837 private long whenToExpire; 838 private int requeueCount = 0; 839 840 private final List<byte[]> families; 841 842 private final FlushLifeCycleTracker tracker; 843 844 FlushRegionEntry(final HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) { 845 this.region = r; 846 this.createTime = EnvironmentEdgeManager.currentTime(); 847 this.whenToExpire = this.createTime; 848 this.families = families; 849 this.tracker = tracker; 850 } 851 852 /** 853 * n * @return True if we have been delayed > <code>maximumWait</code> milliseconds. 854 */ 855 public boolean isMaximumWait(final long maximumWait) { 856 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; 857 } 858 859 /** 860 * @return True if the entry is a delay flush task 861 */ 862 protected boolean isDelay() { 863 return this.whenToExpire > this.createTime; 864 } 865 866 /** 867 * @return Count of times {@link #requeue(long)} was called; i.e this is number of times we've 868 * been requeued. 869 */ 870 public int getRequeueCount() { 871 return this.requeueCount; 872 } 873 874 public FlushLifeCycleTracker getTracker() { 875 return tracker; 876 } 877 878 /** 879 * @param when When to expire, when to come up out of the queue. Specify in milliseconds. This 880 * method adds EnvironmentEdgeManager.currentTime() to whatever you pass. 881 * @return This. 882 */ 883 public FlushRegionEntry requeue(final long when) { 884 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when; 885 this.requeueCount++; 886 return this; 887 } 888 889 @Override 890 public long getDelay(TimeUnit unit) { 891 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), 892 TimeUnit.MILLISECONDS); 893 } 894 895 @Override 896 public int compareTo(Delayed other) { 897 // Delay is compared first. If there is a tie, compare region's hash code 898 int ret = 899 Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)) 900 .intValue(); 901 if (ret != 0) { 902 return ret; 903 } 904 FlushQueueEntry otherEntry = (FlushQueueEntry) other; 905 return hashCode() - otherEntry.hashCode(); 906 } 907 908 @Override 909 public String toString() { 910 return "[flush region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()) + "]"; 911 } 912 913 @Override 914 public int hashCode() { 915 int hash = (int) getDelay(TimeUnit.MILLISECONDS); 916 return hash ^ region.hashCode(); 917 } 918 919 @Override 920 public boolean equals(Object obj) { 921 if (this == obj) { 922 return true; 923 } 924 if (obj == null || getClass() != obj.getClass()) { 925 return false; 926 } 927 FlushRegionEntry other = (FlushRegionEntry) obj; 928 if ( 929 !Bytes.equals(this.region.getRegionInfo().getRegionName(), 930 other.region.getRegionInfo().getRegionName()) 931 ) { 932 return false; 933 } 934 return compareTo(other) == 0; 935 } 936 } 937}