001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.context.Scope; 022import java.io.IOException; 023import java.lang.Thread.UncaughtExceptionHandler; 024import java.util.ArrayList; 025import java.util.ConcurrentModificationException; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.SortedMap; 032import java.util.concurrent.BlockingQueue; 033import java.util.concurrent.DelayQueue; 034import java.util.concurrent.Delayed; 035import java.util.concurrent.ThreadFactory; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.concurrent.atomic.LongAdder; 039import java.util.concurrent.locks.ReentrantReadWriteLock; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.DroppedSnapshotException; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.client.RegionReplicaUtil; 044import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 045import org.apache.hadoop.hbase.trace.TraceUtil; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.ipc.RemoteException; 051import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 057 058/** 059 * Thread that flushes cache on request NOTE: This class extends Thread rather than Chore because 060 * the sleep time can be interrupted when there is something to do, rather than the Chore sleep time 061 * which is invariant. 062 * @see FlushRequester 063 */ 064@InterfaceAudience.Private 065public class MemStoreFlusher implements FlushRequester { 066 private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class); 067 068 private Configuration conf; 069 // These two data members go together. Any entry in the one must have 070 // a corresponding entry in the other. 071 private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>(); 072 protected final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>(); 073 private AtomicBoolean wakeupPending = new AtomicBoolean(); 074 075 private final long threadWakeFrequency; 076 private final HRegionServer server; 077 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 078 private final Object blockSignal = new Object(); 079 080 private long blockingWaitTime; 081 private final LongAdder updatesBlockedMsHighWater = new LongAdder(); 082 083 private final FlushHandler[] flushHandlers; 084 private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1); 085 086 /** 087 * Singleton instance inserted into flush queue used for signaling. 088 */ 089 private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() { 090 @Override 091 public long getDelay(TimeUnit unit) { 092 return 0; 093 } 094 095 @Override 096 public int compareTo(Delayed o) { 097 return -1; 098 } 099 100 @Override 101 public boolean equals(Object obj) { 102 return obj == this; 103 } 104 105 @Override 106 public int hashCode() { 107 return 42; 108 } 109 }; 110 111 /** 112 * nn 113 */ 114 public MemStoreFlusher(final Configuration conf, final HRegionServer server) { 115 super(); 116 this.conf = conf; 117 this.server = server; 118 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 119 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); 120 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); 121 if (server != null) { 122 if (handlerCount < 1) { 123 LOG.warn( 124 "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1", 125 handlerCount); 126 handlerCount = 1; 127 } 128 LOG.info("globalMemStoreLimit=" 129 + TraditionalBinaryPrefix 130 .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) 131 + ", globalMemStoreLimitLowMark=" 132 + TraditionalBinaryPrefix.long2String( 133 this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) 134 + ", Offheap=" + (this.server.getRegionServerAccounting().isOffheap())); 135 } 136 this.flushHandlers = new FlushHandler[handlerCount]; 137 } 138 139 public LongAdder getUpdatesBlockedMsHighWater() { 140 return this.updatesBlockedMsHighWater; 141 } 142 143 /** 144 * The memstore across all regions has exceeded the low water mark. Pick one region to flush and 145 * flush it synchronously (this is called from the flush thread) 146 * @return true if successful 147 */ 148 private boolean flushOneForGlobalPressure(FlushType flushType) { 149 SortedMap<Long, HRegion> regionsBySize = null; 150 switch (flushType) { 151 case ABOVE_OFFHEAP_HIGHER_MARK: 152 case ABOVE_OFFHEAP_LOWER_MARK: 153 regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize(); 154 break; 155 case ABOVE_ONHEAP_HIGHER_MARK: 156 case ABOVE_ONHEAP_LOWER_MARK: 157 default: 158 regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize(); 159 } 160 Set<HRegion> excludedRegions = new HashSet<>(); 161 162 double secondaryMultiplier = 163 ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf); 164 165 boolean flushedOne = false; 166 while (!flushedOne) { 167 // Find the biggest region that doesn't have too many storefiles (might be null!) 168 HRegion bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true); 169 // Find the biggest region, total, even if it might have too many flushes. 170 HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false); 171 // Find the biggest region that is a secondary region 172 HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions); 173 if (bestAnyRegion == null) { 174 // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null 175 bestAnyRegion = bestRegionReplica; 176 } 177 if (bestAnyRegion == null) { 178 LOG.error("Above memory mark but there are no flushable regions!"); 179 return false; 180 } 181 182 HRegion regionToFlush; 183 long bestAnyRegionSize; 184 long bestFlushableRegionSize; 185 switch (flushType) { 186 case ABOVE_OFFHEAP_HIGHER_MARK: 187 case ABOVE_OFFHEAP_LOWER_MARK: 188 bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize(); 189 bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion); 190 break; 191 192 case ABOVE_ONHEAP_HIGHER_MARK: 193 case ABOVE_ONHEAP_LOWER_MARK: 194 bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize(); 195 bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion); 196 break; 197 198 default: 199 bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize(); 200 bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion); 201 } 202 if (bestAnyRegionSize > 2 * bestFlushableRegionSize) { 203 // Even if it's not supposed to be flushed, pick a region if it's more than twice 204 // as big as the best flushable one - otherwise when we're under pressure we make 205 // lots of little flushes and cause lots of compactions, etc, which just makes 206 // life worse! 207 if (LOG.isDebugEnabled()) { 208 LOG.debug("Under global heap pressure: " + "Region " 209 + bestAnyRegion.getRegionInfo().getRegionNameAsString() + " has too many " 210 + "store files, but is " + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1) 211 + " vs best flushable region's " 212 + TraditionalBinaryPrefix.long2String(bestFlushableRegionSize, "", 1) 213 + ". Choosing the bigger."); 214 } 215 regionToFlush = bestAnyRegion; 216 } else { 217 if (bestFlushableRegion == null) { 218 regionToFlush = bestAnyRegion; 219 } else { 220 regionToFlush = bestFlushableRegion; 221 } 222 } 223 224 long regionToFlushSize; 225 long bestRegionReplicaSize; 226 switch (flushType) { 227 case ABOVE_OFFHEAP_HIGHER_MARK: 228 case ABOVE_OFFHEAP_LOWER_MARK: 229 regionToFlushSize = regionToFlush.getMemStoreOffHeapSize(); 230 bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica); 231 break; 232 233 case ABOVE_ONHEAP_HIGHER_MARK: 234 case ABOVE_ONHEAP_LOWER_MARK: 235 regionToFlushSize = regionToFlush.getMemStoreHeapSize(); 236 bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica); 237 break; 238 239 default: 240 regionToFlushSize = regionToFlush.getMemStoreDataSize(); 241 bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica); 242 } 243 244 if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) { 245 // A concurrency issue (such as splitting region) may happen such that the online region 246 // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to 247 // getBiggestMemStoreRegion(). This means that we can come out of the loop 248 LOG.debug("Above memory mark but there is no flushable region"); 249 return false; 250 } 251 252 if ( 253 regionToFlush == null || (bestRegionReplica != null 254 && ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) 255 && (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize)) 256 ) { 257 LOG.info("Refreshing storefiles of region " + bestRegionReplica 258 + " due to global heap pressure. Total memstore off heap size=" 259 + TraditionalBinaryPrefix 260 .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) 261 + " memstore heap size=" + TraditionalBinaryPrefix 262 .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)); 263 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); 264 if (!flushedOne) { 265 LOG.info("Excluding secondary region " + bestRegionReplica 266 + " - trying to find a different region to refresh files."); 267 excludedRegions.add(bestRegionReplica); 268 } 269 } else { 270 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " 271 + "Flush type=" + flushType.toString() + ", Total Memstore Heap size=" 272 + TraditionalBinaryPrefix 273 .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) 274 + ", Total Memstore Off-Heap size=" 275 + TraditionalBinaryPrefix 276 .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) 277 + ", Region memstore size=" 278 + TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1)); 279 flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY); 280 281 if (!flushedOne) { 282 LOG.info("Excluding unflushable region " + regionToFlush 283 + " - trying to find a different region to flush."); 284 excludedRegions.add(regionToFlush); 285 } 286 } 287 } 288 return true; 289 } 290 291 /** Returns Return memstore offheap size or null if <code>r</code> is null */ 292 private static long getMemStoreOffHeapSize(HRegion r) { 293 return r == null ? 0 : r.getMemStoreOffHeapSize(); 294 } 295 296 /** Returns Return memstore heap size or null if <code>r</code> is null */ 297 private static long getMemStoreHeapSize(HRegion r) { 298 return r == null ? 0 : r.getMemStoreHeapSize(); 299 } 300 301 /** Returns Return memstore data size or null if <code>r</code> is null */ 302 private static long getMemStoreDataSize(HRegion r) { 303 return r == null ? 0 : r.getMemStoreDataSize(); 304 } 305 306 private class FlushHandler extends Thread { 307 308 private FlushHandler(String name) { 309 super(name); 310 } 311 312 @Override 313 public void run() { 314 while (!server.isStopped()) { 315 FlushQueueEntry fqe = null; 316 try { 317 wakeupPending.set(false); // allow someone to wake us up again 318 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); 319 if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) { 320 FlushType type = isAboveLowWaterMark(); 321 if (type != FlushType.NORMAL) { 322 LOG.debug("Flush thread woke up because memory above low water=" 323 + TraditionalBinaryPrefix.long2String( 324 server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)); 325 // For offheap memstore, even if the lower water mark was breached due to heap 326 // overhead 327 // we still select the regions based on the region's memstore data size. 328 // TODO : If we want to decide based on heap over head it can be done without tracking 329 // it per region. 330 if (!flushOneForGlobalPressure(type)) { 331 // Wasn't able to flush any region, but we're above low water mark 332 // This is unlikely to happen, but might happen when closing the 333 // entire server - another thread is flushing regions. We'll just 334 // sleep a little bit to avoid spinning, and then pretend that 335 // we flushed one, so anyone blocked will check again 336 Thread.sleep(1000); 337 wakeUpIfBlocking(); 338 } 339 // Enqueue another one of these tokens so we'll wake up again 340 wakeupFlushThread(); 341 } 342 continue; 343 } 344 FlushRegionEntry fre = (FlushRegionEntry) fqe; 345 if (!flushRegion(fre)) { 346 break; 347 } 348 } catch (InterruptedException ex) { 349 continue; 350 } catch (ConcurrentModificationException ex) { 351 continue; 352 } catch (Exception ex) { 353 LOG.error("Cache flusher failed for entry " + fqe, ex); 354 if (!server.checkFileSystem()) { 355 break; 356 } 357 } 358 } 359 synchronized (regionsInQueue) { 360 regionsInQueue.clear(); 361 flushQueue.clear(); 362 } 363 364 // Signal anyone waiting, so they see the close flag 365 wakeUpIfBlocking(); 366 LOG.info(getName() + " exiting"); 367 } 368 } 369 370 private void wakeupFlushThread() { 371 if (wakeupPending.compareAndSet(false, true)) { 372 flushQueue.add(WAKEUPFLUSH_INSTANCE); 373 } 374 } 375 376 private HRegion getBiggestMemStoreRegion(SortedMap<Long, HRegion> regionsBySize, 377 Set<HRegion> excludedRegions, boolean checkStoreFileCount) { 378 synchronized (regionsInQueue) { 379 for (HRegion region : regionsBySize.values()) { 380 if (excludedRegions.contains(region)) { 381 continue; 382 } 383 384 if (region.writestate.flushing || !region.writestate.writesEnabled) { 385 continue; 386 } 387 388 if (checkStoreFileCount && isTooManyStoreFiles(region)) { 389 continue; 390 } 391 return region; 392 } 393 } 394 return null; 395 } 396 397 private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize, 398 Set<HRegion> excludedRegions) { 399 synchronized (regionsInQueue) { 400 for (HRegion region : regionsBySize.values()) { 401 if (excludedRegions.contains(region)) { 402 continue; 403 } 404 405 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 406 continue; 407 } 408 409 return region; 410 } 411 } 412 return null; 413 } 414 415 private boolean refreshStoreFilesAndReclaimMemory(Region region) { 416 try { 417 return region.refreshStoreFiles(); 418 } catch (IOException e) { 419 LOG.warn("Refreshing store files failed with exception", e); 420 } 421 return false; 422 } 423 424 /** 425 * Return the FlushType if global memory usage is above the high watermark 426 */ 427 private FlushType isAboveHighWaterMark() { 428 return server.getRegionServerAccounting().isAboveHighWaterMark(); 429 } 430 431 /** 432 * Return the FlushType if we're above the low watermark 433 */ 434 private FlushType isAboveLowWaterMark() { 435 return server.getRegionServerAccounting().isAboveLowWaterMark(); 436 } 437 438 @Override 439 public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) { 440 return this.requestFlush(r, null, tracker); 441 } 442 443 @Override 444 public boolean requestFlush(HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) { 445 synchronized (regionsInQueue) { 446 FlushRegionEntry existFqe = regionsInQueue.get(r); 447 if (existFqe != null) { 448 // if a delayed one exists and not reach the time to execute, just remove it 449 if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) { 450 LOG.info("Remove the existing delayed flush entry for {}, " 451 + "because we need to flush it immediately", r); 452 this.regionsInQueue.remove(r); 453 this.flushQueue.remove(existFqe); 454 r.decrementFlushesQueuedCount(); 455 } else { 456 tracker.notExecuted("Flush already requested on " + r); 457 return false; 458 } 459 } 460 461 // This entry has no delay so it will be added at the top of the flush 462 // queue. It'll come out near immediately. 463 FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker); 464 this.regionsInQueue.put(r, fqe); 465 this.flushQueue.add(fqe); 466 r.incrementFlushesQueuedCount(); 467 return true; 468 } 469 } 470 471 @Override 472 public boolean requestDelayedFlush(HRegion r, long delay) { 473 synchronized (regionsInQueue) { 474 if (!regionsInQueue.containsKey(r)) { 475 // This entry has some delay 476 FlushRegionEntry fqe = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY); 477 fqe.requeue(delay); 478 this.regionsInQueue.put(r, fqe); 479 this.flushQueue.add(fqe); 480 r.incrementFlushesQueuedCount(); 481 return true; 482 } 483 return false; 484 } 485 } 486 487 public int getFlushQueueSize() { 488 return flushQueue.size(); 489 } 490 491 /** 492 * Only interrupt once it's done with a run through the work loop. 493 */ 494 void interruptIfNecessary() { 495 lock.writeLock().lock(); 496 try { 497 for (FlushHandler flushHander : flushHandlers) { 498 if (flushHander != null) flushHander.interrupt(); 499 } 500 } finally { 501 lock.writeLock().unlock(); 502 } 503 } 504 505 synchronized void start(UncaughtExceptionHandler eh) { 506 ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder() 507 .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d") 508 .setDaemon(true).setUncaughtExceptionHandler(eh).build(); 509 for (int i = 0; i < flushHandlers.length; i++) { 510 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i); 511 flusherThreadFactory.newThread(flushHandlers[i]); 512 flushHandlers[i].start(); 513 } 514 } 515 516 boolean isAlive() { 517 for (FlushHandler flushHander : flushHandlers) { 518 if (flushHander != null && flushHander.isAlive()) { 519 return true; 520 } 521 } 522 return false; 523 } 524 525 void join() { 526 for (FlushHandler flushHander : flushHandlers) { 527 if (flushHander != null) { 528 Threads.shutdown(flushHander); 529 } 530 } 531 } 532 533 /** 534 * A flushRegion that checks store file count. If too many, puts the flush on delay queue to retry 535 * later. n * @return true if the region was successfully flushed, false otherwise. If false, 536 * there will be accompanying log messages explaining why the region was not flushed. 537 */ 538 private boolean flushRegion(final FlushRegionEntry fqe) { 539 HRegion region = fqe.region; 540 if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { 541 if (fqe.isMaximumWait(this.blockingWaitTime)) { 542 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) 543 + "ms on a compaction to clean up 'too many store files'; waited " 544 + "long enough... proceeding with flush of " 545 + region.getRegionInfo().getRegionNameAsString()); 546 } else { 547 // If this is first time we've been put off, then emit a log message. 548 if (fqe.getRequeueCount() <= 0) { 549 // Note: We don't impose blockingStoreFiles constraint on meta regions 550 LOG.warn("{} has too many store files({}); delaying flush up to {} ms", 551 region.getRegionInfo().getEncodedName(), getStoreFileCount(region), 552 this.blockingWaitTime); 553 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 554 if (!compactSplitThread.requestSplit(region)) { 555 try { 556 compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 557 } catch (IOException e) { 558 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 559 LOG.error("Cache flush failed for region " 560 + Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e); 561 } 562 } 563 } 564 565 // Put back on the queue. Have it come back out of the queue 566 // after a delay of this.blockingWaitTime / 100 ms. 567 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); 568 // Tell a lie, it's not flushed but it's ok 569 return true; 570 } 571 } 572 return flushRegion(region, false, fqe.families, fqe.getTracker()); 573 } 574 575 /** 576 * Flush a region. 577 * @param region Region to flush. 578 * @param emergencyFlush Set if we are being force flushed. If true the region needs to be removed 579 * from the flush queue. If false, when we were called from the main flusher 580 * run loop and we got the entry to flush by calling poll on the flush queue 581 * (which removed it). 582 * @param families stores of region to flush. 583 * @return true if the region was successfully flushed, false otherwise. If false, there will be 584 * accompanying log messages explaining why the region was not flushed. 585 */ 586 private boolean flushRegion(HRegion region, boolean emergencyFlush, List<byte[]> families, 587 FlushLifeCycleTracker tracker) { 588 synchronized (this.regionsInQueue) { 589 FlushRegionEntry fqe = this.regionsInQueue.remove(region); 590 // Use the start time of the FlushRegionEntry if available 591 if (fqe != null && emergencyFlush) { 592 // Need to remove from region from delay queue. When NOT an 593 // emergencyFlush, then item was removed via a flushQueue.poll. 594 flushQueue.remove(fqe); 595 } 596 } 597 598 tracker.beforeExecution(); 599 lock.readLock().lock(); 600 final CompactSplit compactSplitThread = server.getCompactSplitThread(); 601 try { 602 notifyFlushRequest(region, emergencyFlush); 603 FlushResult flushResult = region.flushcache(families, false, tracker); 604 boolean shouldCompact = flushResult.isCompactionNeeded(); 605 // We just want to check the size 606 boolean shouldSplit = region.checkSplit().isPresent(); 607 if (shouldSplit) { 608 compactSplitThread.requestSplit(region); 609 } else if (shouldCompact) { 610 compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 611 } 612 } catch (DroppedSnapshotException ex) { 613 // Cache flush can fail in a few places. If it fails in a critical 614 // section, we get a DroppedSnapshotException and a replay of wal 615 // is required. Currently the only way to do this is a restart of 616 // the server. Abort because hdfs is probably bad (HBASE-644 is a case 617 // where hdfs was bad but passed the hdfs check). 618 server.abort("Replay of WAL required. Forcing server shutdown", ex); 619 return false; 620 } catch (IOException ex) { 621 ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 622 LOG.error("Cache flush failed" + (region != null 623 ? (" for region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName())) 624 : ""), ex); 625 if (!server.checkFileSystem()) { 626 return false; 627 } 628 } finally { 629 lock.readLock().unlock(); 630 wakeUpIfBlocking(); 631 tracker.afterExecution(); 632 } 633 return true; 634 } 635 636 private void notifyFlushRequest(Region region, boolean emergencyFlush) { 637 FlushType type = null; 638 if (emergencyFlush) { 639 type = isAboveHighWaterMark(); 640 } 641 if (type == null) { 642 type = isAboveLowWaterMark(); 643 } 644 for (FlushRequestListener listener : flushRequestListeners) { 645 listener.flushRequested(type, region); 646 } 647 } 648 649 private void wakeUpIfBlocking() { 650 synchronized (blockSignal) { 651 blockSignal.notifyAll(); 652 } 653 } 654 655 private boolean isTooManyStoreFiles(Region region) { 656 657 // When compaction is disabled, the region is flushable 658 if (!region.getTableDescriptor().isCompactionEnabled()) { 659 return false; 660 } 661 662 for (Store store : region.getStores()) { 663 if (store.hasTooManyStoreFiles()) { 664 return true; 665 } 666 } 667 return false; 668 } 669 670 private int getStoreFileCount(Region region) { 671 int count = 0; 672 for (Store store : region.getStores()) { 673 count += store.getStorefilesCount(); 674 } 675 return count; 676 } 677 678 /** 679 * Check if the regionserver's memstore memory usage is greater than the limit. If so, flush 680 * regions with the biggest memstores until we're down to the lower limit. This method blocks 681 * callers until we're down to a safe amount of memstore consumption. 682 */ 683 public void reclaimMemStoreMemory() { 684 Span span = 685 TraceUtil.getGlobalTracer().spanBuilder("MemStoreFluser.reclaimMemStoreMemory").startSpan(); 686 try (Scope scope = span.makeCurrent()) { 687 FlushType flushType = isAboveHighWaterMark(); 688 if (flushType != FlushType.NORMAL) { 689 span.addEvent("Force Flush. We're above high water mark."); 690 long start = EnvironmentEdgeManager.currentTime(); 691 long nextLogTimeMs = start; 692 synchronized (this.blockSignal) { 693 boolean blocked = false; 694 long startTime = 0; 695 boolean interrupted = false; 696 try { 697 flushType = isAboveHighWaterMark(); 698 while (flushType != FlushType.NORMAL && !server.isStopped()) { 699 if (!blocked) { 700 startTime = EnvironmentEdgeManager.currentTime(); 701 if (!server.getRegionServerAccounting().isOffheap()) { 702 logMsg("global memstore heapsize", 703 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 704 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 705 } else { 706 switch (flushType) { 707 case ABOVE_OFFHEAP_HIGHER_MARK: 708 logMsg("the global offheap memstore datasize", 709 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), 710 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 711 break; 712 case ABOVE_ONHEAP_HIGHER_MARK: 713 logMsg("global memstore heapsize", 714 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 715 server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit()); 716 break; 717 default: 718 break; 719 } 720 } 721 } 722 blocked = true; 723 wakeupFlushThread(); 724 try { 725 // we should be able to wait forever, but we've seen a bug where 726 // we miss a notify, so put a 5 second bound on it at least. 727 blockSignal.wait(5 * 1000); 728 } catch (InterruptedException ie) { 729 LOG.warn("Interrupted while waiting"); 730 interrupted = true; 731 } 732 long nowMs = EnvironmentEdgeManager.currentTime(); 733 if (nowMs >= nextLogTimeMs) { 734 LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start); 735 nextLogTimeMs = nowMs + 1000; 736 } 737 flushType = isAboveHighWaterMark(); 738 } 739 } finally { 740 if (interrupted) { 741 Thread.currentThread().interrupt(); 742 } 743 } 744 745 if (blocked) { 746 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; 747 if (totalTime > 0) { 748 this.updatesBlockedMsHighWater.add(totalTime); 749 } 750 LOG.info("Unblocking updates for server " + server.toString()); 751 } 752 } 753 } else { 754 flushType = isAboveLowWaterMark(); 755 if (flushType != FlushType.NORMAL) { 756 wakeupFlushThread(); 757 } 758 span.end(); 759 } 760 } 761 } 762 763 private void logMsg(String type, long val, long max) { 764 LOG.info("Blocking updates: {} {} is >= blocking {}", type, 765 TraditionalBinaryPrefix.long2String(val, "", 1), 766 TraditionalBinaryPrefix.long2String(max, "", 1)); 767 } 768 769 @Override 770 public String toString() { 771 return "flush_queue=" + flushQueue.size(); 772 } 773 774 public String dumpQueue() { 775 StringBuilder queueList = new StringBuilder(); 776 queueList.append("Flush Queue Queue dump:\n"); 777 queueList.append(" Flush Queue:\n"); 778 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator(); 779 780 while (it.hasNext()) { 781 queueList.append(" " + it.next().toString()); 782 queueList.append("\n"); 783 } 784 785 return queueList.toString(); 786 } 787 788 /** 789 * Register a MemstoreFlushListener n 790 */ 791 @Override 792 public void registerFlushRequestListener(final FlushRequestListener listener) { 793 this.flushRequestListeners.add(listener); 794 } 795 796 /** 797 * Unregister the listener from MemstoreFlushListeners n * @return true when passed listener is 798 * unregistered successfully. 799 */ 800 @Override 801 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) { 802 return this.flushRequestListeners.remove(listener); 803 } 804 805 /** 806 * Sets the global memstore limit to a new size. n 807 */ 808 @Override 809 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 810 this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize); 811 reclaimMemStoreMemory(); 812 } 813 814 interface FlushQueueEntry extends Delayed { 815 } 816 817 /** 818 * Datastructure used in the flush queue. Holds region and retry count. Keeps tabs on how old this 819 * object is. Implements {@link Delayed}. On construction, the delay is zero. When added to a 820 * delay queue, we'll come out near immediately. Call {@link #requeue(long)} passing delay in 821 * milliseconds before readding to delay queue if you want it to stay there a while. 822 */ 823 static class FlushRegionEntry implements FlushQueueEntry { 824 private final HRegion region; 825 826 private final long createTime; 827 private long whenToExpire; 828 private int requeueCount = 0; 829 830 private final List<byte[]> families; 831 832 private final FlushLifeCycleTracker tracker; 833 834 FlushRegionEntry(final HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) { 835 this.region = r; 836 this.createTime = EnvironmentEdgeManager.currentTime(); 837 this.whenToExpire = this.createTime; 838 this.families = families; 839 this.tracker = tracker; 840 } 841 842 /** 843 * n * @return True if we have been delayed > <code>maximumWait</code> milliseconds. 844 */ 845 public boolean isMaximumWait(final long maximumWait) { 846 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; 847 } 848 849 /** Returns True if the entry is a delay flush task */ 850 protected boolean isDelay() { 851 return this.whenToExpire > this.createTime; 852 } 853 854 /** 855 * @return Count of times {@link #requeue(long)} was called; i.e this is number of times we've 856 * been requeued. 857 */ 858 public int getRequeueCount() { 859 return this.requeueCount; 860 } 861 862 public FlushLifeCycleTracker getTracker() { 863 return tracker; 864 } 865 866 /** 867 * @param when When to expire, when to come up out of the queue. Specify in milliseconds. This 868 * method adds EnvironmentEdgeManager.currentTime() to whatever you pass. 869 * @return This. 870 */ 871 public FlushRegionEntry requeue(final long when) { 872 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when; 873 this.requeueCount++; 874 return this; 875 } 876 877 @Override 878 public long getDelay(TimeUnit unit) { 879 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), 880 TimeUnit.MILLISECONDS); 881 } 882 883 @Override 884 public int compareTo(Delayed other) { 885 // Delay is compared first. If there is a tie, compare region's hash code 886 int ret = 887 Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)) 888 .intValue(); 889 if (ret != 0) { 890 return ret; 891 } 892 FlushQueueEntry otherEntry = (FlushQueueEntry) other; 893 return hashCode() - otherEntry.hashCode(); 894 } 895 896 @Override 897 public String toString() { 898 return "[flush region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()) + "]"; 899 } 900 901 @Override 902 public int hashCode() { 903 int hash = (int) getDelay(TimeUnit.MILLISECONDS); 904 return hash ^ region.hashCode(); 905 } 906 907 @Override 908 public boolean equals(Object obj) { 909 if (this == obj) { 910 return true; 911 } 912 if (obj == null || getClass() != obj.getClass()) { 913 return false; 914 } 915 FlushRegionEntry other = (FlushRegionEntry) obj; 916 if ( 917 !Bytes.equals(this.region.getRegionInfo().getRegionName(), 918 other.region.getRegionInfo().getRegionName()) 919 ) { 920 return false; 921 } 922 return compareTo(other) == 0; 923 } 924 } 925}