001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.lang.Thread.UncaughtExceptionHandler; 023import java.util.ArrayList; 024import java.util.ConcurrentModificationException; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.SortedMap; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.DelayQueue; 033import java.util.concurrent.Delayed; 034import java.util.concurrent.ThreadFactory; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.LongAdder; 038import java.util.concurrent.locks.ReentrantReadWriteLock; 039 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.DroppedSnapshotException; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.client.RegionReplicaUtil; 044import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 045import org.apache.hadoop.hbase.trace.TraceUtil; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.HasThread; 049import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 050import org.apache.hadoop.hbase.util.Threads; 051import org.apache.hadoop.ipc.RemoteException; 052import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 053import org.apache.htrace.core.TraceScope; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * Thread that flushes cache on request 060 * 061 * NOTE: This class extends Thread rather than Chore because the sleep time 062 * can be interrupted when there is something to do, rather than the Chore 063 * sleep time which is invariant. 064 * 065 * @see FlushRequester 066 */ 067@InterfaceAudience.Private 068class MemStoreFlusher implements FlushRequester { 069 private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class); 070 071 private Configuration conf; 072 // These two data members go together. Any entry in the one must have 073 // a corresponding entry in the other. 074 private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>(); 075 private final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>(); 076 private AtomicBoolean wakeupPending = new AtomicBoolean(); 077 078 private final long threadWakeFrequency; 079 private final HRegionServer server; 080 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 081 private final Object blockSignal = new Object(); 082 083 private long blockingWaitTime; 084 private final LongAdder updatesBlockedMsHighWater = new LongAdder(); 085 086 private final FlushHandler[] flushHandlers; 087 private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1); 088 089 private FlushType flushType; 090 091 /** 092 * Singleton instance inserted into flush queue used for signaling. 093 */ 094 private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() { 095 @Override 096 public long getDelay(TimeUnit unit) { 097 return 0; 098 } 099 100 @Override 101 public int compareTo(Delayed o) { 102 return -1; 103 } 104 105 @Override 106 public boolean equals(Object obj) { 107 return obj == this; 108 } 109 110 @Override 111 public int hashCode() { 112 return 42; 113 } 114 }; 115 116 117 /** 118 * @param conf 119 * @param server 120 */ 121 public MemStoreFlusher(final Configuration conf, 122 final HRegionServer server) { 123 super(); 124 this.conf = conf; 125 this.server = server; 126 this.threadWakeFrequency = 127 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 128 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 129 90000); 130 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); 131 this.flushHandlers = new FlushHandler[handlerCount]; 132 LOG.info("globalMemStoreLimit=" 133 + TraditionalBinaryPrefix 134 .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) 135 + ", globalMemStoreLimitLowMark=" 136 + TraditionalBinaryPrefix.long2String( 137 this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) 138 + ", Offheap=" 139 + (this.server.getRegionServerAccounting().isOffheap())); 140 } 141 142 public LongAdder getUpdatesBlockedMsHighWater() { 143 return this.updatesBlockedMsHighWater; 144 } 145 146 public void setFlushType(FlushType flushType) { 147 this.flushType = flushType; 148 } 149 150 /** 151 * The memstore across all regions has exceeded the low water mark. Pick 152 * one region to flush and flush it synchronously (this is called from the 153 * flush thread) 154 * @return true if successful 155 */ 156 private boolean flushOneForGlobalPressure() { 157 SortedMap<Long, HRegion> regionsBySize = null; 158 switch(flushType) { 159 case ABOVE_OFFHEAP_HIGHER_MARK: 160 case ABOVE_OFFHEAP_LOWER_MARK: 161 regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize(); 162 break; 163 case ABOVE_ONHEAP_HIGHER_MARK: 164 case ABOVE_ONHEAP_LOWER_MARK: 165 default: 166 regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize(); 167 } 168 Set<HRegion> excludedRegions = new HashSet<>(); 169 170 double secondaryMultiplier 171 = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf); 172 173 boolean flushedOne = false; 174 while (!flushedOne) { 175 // Find the biggest region that doesn't have too many storefiles (might be null!) 176 HRegion bestFlushableRegion = 177 getBiggestMemStoreRegion(regionsBySize, excludedRegions, true); 178 // Find the biggest region, total, even if it might have too many flushes. 179 HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false); 180 // Find the biggest region that is a secondary region 181 HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions); 182 if (bestAnyRegion == null) { 183 // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null 184 bestAnyRegion = bestRegionReplica; 185 } 186 if (bestAnyRegion == null) { 187 LOG.error("Above memory mark but there are no flushable regions!"); 188 return false; 189 } 190 191 HRegion regionToFlush; 192 long bestAnyRegionSize; 193 long bestFlushableRegionSize; 194 switch(flushType) { 195 case ABOVE_OFFHEAP_HIGHER_MARK: 196 case ABOVE_OFFHEAP_LOWER_MARK: 197 bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize(); 198 bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion); 199 break; 200 201 case ABOVE_ONHEAP_HIGHER_MARK: 202 case ABOVE_ONHEAP_LOWER_MARK: 203 bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize(); 204 bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion); 205 break; 206 207 default: 208 bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize(); 209 bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion); 210 } 211 if (bestAnyRegionSize > 2 * bestFlushableRegionSize) { 212 // Even if it's not supposed to be flushed, pick a region if it's more than twice 213 // as big as the best flushable one - otherwise when we're under pressure we make 214 // lots of little flushes and cause lots of compactions, etc, which just makes 215 // life worse! 216 if (LOG.isDebugEnabled()) { 217 LOG.debug("Under global heap pressure: " + "Region " 218 + bestAnyRegion.getRegionInfo().getRegionNameAsString() 219 + " has too many " + "store files, but is " 220 + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1) 221 + " vs best flushable region's " 222 + TraditionalBinaryPrefix.long2String( 223 bestFlushableRegionSize, "", 1) 224 + ". Choosing the bigger."); 225 } 226 regionToFlush = bestAnyRegion; 227 } else { 228 if (bestFlushableRegion == null) { 229 regionToFlush = bestAnyRegion; 230 } else { 231 regionToFlush = bestFlushableRegion; 232 } 233 } 234 235 long regionToFlushSize; 236 long bestRegionReplicaSize; 237 switch(flushType) { 238 case ABOVE_OFFHEAP_HIGHER_MARK: 239 case ABOVE_OFFHEAP_LOWER_MARK: 240 regionToFlushSize = regionToFlush.getMemStoreOffHeapSize(); 241 bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica); 242 break; 243 244 case ABOVE_ONHEAP_HIGHER_MARK: 245 case ABOVE_ONHEAP_LOWER_MARK: 246 regionToFlushSize = regionToFlush.getMemStoreHeapSize(); 247 bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica); 248 break; 249 250 default: 251 regionToFlushSize = regionToFlush.getMemStoreDataSize(); 252 bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica); 253 } 254 255 if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) { 256 // A concurrency issue (such as splitting region) may happen such that the online region 257 // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to 258 // getBiggestMemStoreRegion(). This means that we can come out of the loop 259 LOG.debug("Above memory mark but there is no flushable region"); 260 return false; 261 } 262 263 if (regionToFlush == null || 264 (bestRegionReplica != null && 265 ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) && 266 (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))) { 267 LOG.info("Refreshing storefiles of region " + bestRegionReplica + 268 " due to global heap pressure. Total memstore off heap size=" + 269 TraditionalBinaryPrefix.long2String( 270 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) + 271 " memstore heap size=" + TraditionalBinaryPrefix.long2String( 272 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)); 273 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); 274 if (!flushedOne) { 275 LOG.info("Excluding secondary region " + bestRegionReplica + 276 " - trying to find a different region to refresh files."); 277 excludedRegions.add(bestRegionReplica); 278 } 279 } else { 280 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " + 281 "Flush type=" + flushType.toString() + 282 ", Total Memstore Heap size=" + 283 TraditionalBinaryPrefix.long2String( 284 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) + 285 ", Total Memstore Off-Heap size=" + 286 TraditionalBinaryPrefix.long2String( 287 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) + 288 ", Region memstore size=" + 289 TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1)); 290 flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY); 291 292 if (!flushedOne) { 293 LOG.info("Excluding unflushable region " + regionToFlush + 294 " - trying to find a different region to flush."); 295 excludedRegions.add(regionToFlush); 296 } 297 } 298 } 299 return true; 300 } 301 302 /** 303 * @return Return memstore offheap size or null if <code>r</code> is null 304 */ 305 private static long getMemStoreOffHeapSize(HRegion r) { 306 return r == null? 0: r.getMemStoreOffHeapSize(); 307 } 308 309 /** 310 * @return Return memstore heap size or null if <code>r</code> is null 311 */ 312 private static long getMemStoreHeapSize(HRegion r) { 313 return r == null? 0: r.getMemStoreHeapSize(); 314 } 315 316 /** 317 * @return Return memstore data size or null if <code>r</code> is null 318 */ 319 private static long getMemStoreDataSize(HRegion r) { 320 return r == null? 0: r.getMemStoreDataSize(); 321 } 322 323 private class FlushHandler extends HasThread { 324 325 private FlushHandler(String name) { 326 super(name); 327 } 328 329 @Override 330 public void run() { 331 while (!server.isStopped()) { 332 FlushQueueEntry fqe = null; 333 try { 334 wakeupPending.set(false); // allow someone to wake us up again 335 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); 336 if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) { 337 FlushType type = isAboveLowWaterMark(); 338 if (type != FlushType.NORMAL) { 339 LOG.debug("Flush thread woke up because memory above low water=" 340 + TraditionalBinaryPrefix.long2String( 341 server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)); 342 // For offheap memstore, even if the lower water mark was breached due to heap overhead 343 // we still select the regions based on the region's memstore data size. 344 // TODO : If we want to decide based on heap over head it can be done without tracking 345 // it per region. 346 if (!flushOneForGlobalPressure()) { 347 // Wasn't able to flush any region, but we're above low water mark 348 // This is unlikely to happen, but might happen when closing the 349 // entire server - another thread is flushing regions. We'll just 350 // sleep a little bit to avoid spinning, and then pretend that 351 // we flushed one, so anyone blocked will check again 352 Thread.sleep(1000); 353 wakeUpIfBlocking(); 354 } 355 // Enqueue another one of these tokens so we'll wake up again 356 wakeupFlushThread(); 357 } 358 continue; 359 } 360 FlushRegionEntry fre = (FlushRegionEntry) fqe; 361 if (!flushRegion(fre)) { 362 break; 363 } 364 } catch (InterruptedException ex) { 365 continue; 366 } catch (ConcurrentModificationException ex) { 367 continue; 368 } catch (Exception ex) { 369 LOG.error("Cache flusher failed for entry " + fqe, ex); 370 if (!server.checkFileSystem()) { 371 break; 372 } 373 } 374 } 375 synchronized (regionsInQueue) { 376 regionsInQueue.clear(); 377 flushQueue.clear(); 378 } 379 380 // Signal anyone waiting, so they see the close flag 381 wakeUpIfBlocking(); 382 LOG.info(getName() + " exiting"); 383 } 384 } 385 386 387 private void wakeupFlushThread() { 388 if (wakeupPending.compareAndSet(false, true)) { 389 flushQueue.add(WAKEUPFLUSH_INSTANCE); 390 } 391 } 392 393 private HRegion getBiggestMemStoreRegion( 394 SortedMap<Long, HRegion> regionsBySize, 395 Set<HRegion> excludedRegions, 396 boolean checkStoreFileCount) { 397 synchronized (regionsInQueue) { 398 for (HRegion region : regionsBySize.values()) { 399 if (excludedRegions.contains(region)) { 400 continue; 401 } 402 403 if (region.writestate.flushing || !region.writestate.writesEnabled) { 404 continue; 405 } 406 407 if (checkStoreFileCount && isTooManyStoreFiles(region)) { 408 continue; 409 } 410 return region; 411 } 412 } 413 return null; 414 } 415 416 private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize, 417 Set<HRegion> excludedRegions) { 418 synchronized (regionsInQueue) { 419 for (HRegion region : regionsBySize.values()) { 420 if (excludedRegions.contains(region)) { 421 continue; 422 } 423 424 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 425 continue; 426 } 427 428 return region; 429 } 430 } 431 return null; 432 } 433 434 private boolean refreshStoreFilesAndReclaimMemory(Region region) { 435 try { 436 return region.refreshStoreFiles(); 437 } catch (IOException e) { 438 LOG.warn("Refreshing store files failed with exception", e); 439 } 440 return false; 441 } 442 443 /** 444 * Return true if global memory usage is above the high watermark 445 */ 446 private FlushType isAboveHighWaterMark() { 447 return server.getRegionServerAccounting().isAboveHighWaterMark(); 448 } 449 450 /** 451 * Return true if we're above the low watermark 452 */ 453 private FlushType isAboveLowWaterMark() { 454 return server.getRegionServerAccounting().isAboveLowWaterMark(); 455 } 456 457 @Override 458 public boolean requestFlush(HRegion r, boolean forceFlushAllStores, 459 FlushLifeCycleTracker tracker) { 460 synchronized (regionsInQueue) { 461 if (!regionsInQueue.containsKey(r)) { 462 // This entry has no delay so it will be added at the top of the flush 463 // queue. It'll come out near immediately. 464 FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker); 465 this.regionsInQueue.put(r, fqe); 466 this.flushQueue.add(fqe); 467 r.incrementFlushesQueuedCount(); 468 return true; 469 } else { 470 tracker.notExecuted("Flush already requested on " + r); 471 return false; 472 } 473 } 474 } 475 476 @Override 477 public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) { 478 synchronized (regionsInQueue) { 479 if (!regionsInQueue.containsKey(r)) { 480 // This entry has some delay 481 FlushRegionEntry fqe = 482 new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY); 483 fqe.requeue(delay); 484 this.regionsInQueue.put(r, fqe); 485 this.flushQueue.add(fqe); 486 r.incrementFlushesQueuedCount(); 487 return true; 488 } 489 return false; 490 } 491 } 492 493 public int getFlushQueueSize() { 494 return flushQueue.size(); 495 } 496 497 /** 498 * Only interrupt once it's done with a run through the work loop. 499 */ 500 void interruptIfNecessary() { 501 lock.writeLock().lock(); 502 try { 503 for (FlushHandler flushHander : flushHandlers) { 504 if (flushHander != null) flushHander.interrupt(); 505 } 506 } finally { 507 lock.writeLock().unlock(); 508 } 509 } 510 511 synchronized void start(UncaughtExceptionHandler eh) { 512 ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory( 513 server.getServerName().toShortString() + "-MemStoreFlusher", eh); 514 for (int i = 0; i < flushHandlers.length; i++) { 515 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i); 516 flusherThreadFactory.newThread(flushHandlers[i]); 517 flushHandlers[i].start(); 518 } 519 } 520 521 boolean isAlive() { 522 for (FlushHandler flushHander : flushHandlers) { 523 if (flushHander != null && flushHander.isAlive()) { 524 return true; 525 } 526 } 527 return false; 528 } 529 530 void join() { 531 for (FlushHandler flushHander : flushHandlers) { 532 if (flushHander != null) { 533 Threads.shutdown(flushHander.getThread()); 534 } 535 } 536 } 537 538 /** 539 * A flushRegion that checks store file count. If too many, puts the flush 540 * on delay queue to retry later. 541 * @param fqe 542 * @return true if the region was successfully flushed, false otherwise. If 543 * false, there will be accompanying log messages explaining why the region was 544 * not flushed. 545 */ 546 private boolean flushRegion(final FlushRegionEntry fqe) { 547 HRegion region = fqe.region; 548 if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { 549 if (fqe.isMaximumWait(this.blockingWaitTime)) { 550 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) + 551 "ms on a compaction to clean up 'too many store files'; waited " + 552 "long enough... proceeding with flush of " + 553 region.getRegionInfo().getRegionNameAsString()); 554 } else { 555 // If this is first time we've been put off, then emit a log message. 556 if (fqe.getRequeueCount() <= 0) { 557 // Note: We don't impose blockingStoreFiles constraint on meta regions 558 LOG.warn("{} has too many store files({}); delaying flush up to {} ms", 559 region.getRegionInfo().getEncodedName(), getStoreFileCount(region), 560 this.blockingWaitTime); 561 if (!this.server.compactSplitThread.requestSplit(region)) { 562 try { 563 this.server.compactSplitThread.requestSystemCompaction(region, 564 Thread.currentThread().getName()); 565 } catch (IOException e) { 566 e = e instanceof RemoteException ? 567 ((RemoteException)e).unwrapRemoteException() : e; 568 LOG.error("Cache flush failed for region " + 569 Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e); 570 } 571 } 572 } 573 574 // Put back on the queue. Have it come back out of the queue 575 // after a delay of this.blockingWaitTime / 100 ms. 576 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); 577 // Tell a lie, it's not flushed but it's ok 578 return true; 579 } 580 } 581 return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker()); 582 } 583 584 /** 585 * Flush a region. 586 * @param region Region to flush. 587 * @param emergencyFlush Set if we are being force flushed. If true the region 588 * needs to be removed from the flush queue. If false, when we were called 589 * from the main flusher run loop and we got the entry to flush by calling 590 * poll on the flush queue (which removed it). 591 * @param forceFlushAllStores whether we want to flush all store. 592 * @return true if the region was successfully flushed, false otherwise. If 593 * false, there will be accompanying log messages explaining why the region was 594 * not flushed. 595 */ 596 private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores, 597 FlushLifeCycleTracker tracker) { 598 synchronized (this.regionsInQueue) { 599 FlushRegionEntry fqe = this.regionsInQueue.remove(region); 600 // Use the start time of the FlushRegionEntry if available 601 if (fqe != null && emergencyFlush) { 602 // Need to remove from region from delay queue. When NOT an 603 // emergencyFlush, then item was removed via a flushQueue.poll. 604 flushQueue.remove(fqe); 605 } 606 } 607 608 tracker.beforeExecution(); 609 lock.readLock().lock(); 610 try { 611 notifyFlushRequest(region, emergencyFlush); 612 FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker); 613 boolean shouldCompact = flushResult.isCompactionNeeded(); 614 // We just want to check the size 615 boolean shouldSplit = region.checkSplit() != null; 616 if (shouldSplit) { 617 this.server.compactSplitThread.requestSplit(region); 618 } else if (shouldCompact) { 619 server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 620 } 621 } catch (DroppedSnapshotException ex) { 622 // Cache flush can fail in a few places. If it fails in a critical 623 // section, we get a DroppedSnapshotException and a replay of wal 624 // is required. Currently the only way to do this is a restart of 625 // the server. Abort because hdfs is probably bad (HBASE-644 is a case 626 // where hdfs was bad but passed the hdfs check). 627 server.abort("Replay of WAL required. Forcing server shutdown", ex); 628 return false; 629 } catch (IOException ex) { 630 ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 631 LOG.error( 632 "Cache flush failed" 633 + (region != null ? (" for region " + 634 Bytes.toStringBinary(region.getRegionInfo().getRegionName())) 635 : ""), ex); 636 if (!server.checkFileSystem()) { 637 return false; 638 } 639 } finally { 640 lock.readLock().unlock(); 641 wakeUpIfBlocking(); 642 tracker.afterExecution(); 643 } 644 return true; 645 } 646 647 private void notifyFlushRequest(Region region, boolean emergencyFlush) { 648 FlushType type = null; 649 if (emergencyFlush) { 650 type = isAboveHighWaterMark(); 651 if (type == null) { 652 type = isAboveLowWaterMark(); 653 } 654 } 655 for (FlushRequestListener listener : flushRequestListeners) { 656 listener.flushRequested(type, region); 657 } 658 } 659 660 private void wakeUpIfBlocking() { 661 synchronized (blockSignal) { 662 blockSignal.notifyAll(); 663 } 664 } 665 666 private boolean isTooManyStoreFiles(Region region) { 667 668 // When compaction is disabled, the region is flushable 669 if (!region.getTableDescriptor().isCompactionEnabled()) { 670 return false; 671 } 672 673 for (Store store : region.getStores()) { 674 if (store.hasTooManyStoreFiles()) { 675 return true; 676 } 677 } 678 return false; 679 } 680 681 private int getStoreFileCount(Region region) { 682 int count = 0; 683 for (Store store : region.getStores()) { 684 count += store.getStorefilesCount(); 685 } 686 return count; 687 } 688 689 /** 690 * Check if the regionserver's memstore memory usage is greater than the 691 * limit. If so, flush regions with the biggest memstores until we're down 692 * to the lower limit. This method blocks callers until we're down to a safe 693 * amount of memstore consumption. 694 */ 695 public void reclaimMemStoreMemory() { 696 try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) { 697 FlushType flushType = isAboveHighWaterMark(); 698 if (flushType != FlushType.NORMAL) { 699 TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark."); 700 long start = EnvironmentEdgeManager.currentTime(); 701 long nextLogTimeMs = start; 702 synchronized (this.blockSignal) { 703 boolean blocked = false; 704 long startTime = 0; 705 boolean interrupted = false; 706 try { 707 flushType = isAboveHighWaterMark(); 708 while (flushType != FlushType.NORMAL && !server.isStopped()) { 709 server.cacheFlusher.setFlushType(flushType); 710 if (!blocked) { 711 startTime = EnvironmentEdgeManager.currentTime(); 712 if (!server.getRegionServerAccounting().isOffheap()) { 713 logMsg("global memstore heapsize", 714 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 715 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 716 } else { 717 switch (flushType) { 718 case ABOVE_OFFHEAP_HIGHER_MARK: 719 logMsg("the global offheap memstore datasize", 720 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), 721 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 722 break; 723 case ABOVE_ONHEAP_HIGHER_MARK: 724 logMsg("global memstore heapsize", 725 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 726 server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit()); 727 break; 728 default: 729 break; 730 } 731 } 732 } 733 blocked = true; 734 wakeupFlushThread(); 735 try { 736 // we should be able to wait forever, but we've seen a bug where 737 // we miss a notify, so put a 5 second bound on it at least. 738 blockSignal.wait(5 * 1000); 739 } catch (InterruptedException ie) { 740 LOG.warn("Interrupted while waiting"); 741 interrupted = true; 742 } 743 long nowMs = EnvironmentEdgeManager.currentTime(); 744 if (nowMs >= nextLogTimeMs) { 745 LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start); 746 nextLogTimeMs = nowMs + 1000; 747 } 748 flushType = isAboveHighWaterMark(); 749 } 750 } finally { 751 if (interrupted) { 752 Thread.currentThread().interrupt(); 753 } 754 } 755 756 if(blocked){ 757 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; 758 if(totalTime > 0){ 759 this.updatesBlockedMsHighWater.add(totalTime); 760 } 761 LOG.info("Unblocking updates for server " + server.toString()); 762 } 763 } 764 } else { 765 flushType = isAboveLowWaterMark(); 766 if (flushType != FlushType.NORMAL) { 767 server.cacheFlusher.setFlushType(flushType); 768 wakeupFlushThread(); 769 } 770 } 771 } 772 } 773 774 private void logMsg(String type, long val, long max) { 775 LOG.info("Blocking updates: {} {} is >= blocking {}", type, 776 TraditionalBinaryPrefix.long2String(val, "", 1), 777 TraditionalBinaryPrefix.long2String(max, "", 1)); 778 } 779 780 @Override 781 public String toString() { 782 return "flush_queue=" 783 + flushQueue.size(); 784 } 785 786 public String dumpQueue() { 787 StringBuilder queueList = new StringBuilder(); 788 queueList.append("Flush Queue Queue dump:\n"); 789 queueList.append(" Flush Queue:\n"); 790 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator(); 791 792 while(it.hasNext()){ 793 queueList.append(" "+it.next().toString()); 794 queueList.append("\n"); 795 } 796 797 return queueList.toString(); 798 } 799 800 /** 801 * Register a MemstoreFlushListener 802 * @param listener 803 */ 804 @Override 805 public void registerFlushRequestListener(final FlushRequestListener listener) { 806 this.flushRequestListeners.add(listener); 807 } 808 809 /** 810 * Unregister the listener from MemstoreFlushListeners 811 * @param listener 812 * @return true when passed listener is unregistered successfully. 813 */ 814 @Override 815 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) { 816 return this.flushRequestListeners.remove(listener); 817 } 818 819 /** 820 * Sets the global memstore limit to a new size. 821 * @param globalMemStoreSize 822 */ 823 @Override 824 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 825 this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize); 826 reclaimMemStoreMemory(); 827 } 828 829 interface FlushQueueEntry extends Delayed { 830 } 831 832 /** 833 * Datastructure used in the flush queue. Holds region and retry count. 834 * Keeps tabs on how old this object is. Implements {@link Delayed}. On 835 * construction, the delay is zero. When added to a delay queue, we'll come 836 * out near immediately. Call {@link #requeue(long)} passing delay in 837 * milliseconds before readding to delay queue if you want it to stay there 838 * a while. 839 */ 840 static class FlushRegionEntry implements FlushQueueEntry { 841 private final HRegion region; 842 843 private final long createTime; 844 private long whenToExpire; 845 private int requeueCount = 0; 846 847 private final boolean forceFlushAllStores; 848 849 private final FlushLifeCycleTracker tracker; 850 851 FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) { 852 this.region = r; 853 this.createTime = EnvironmentEdgeManager.currentTime(); 854 this.whenToExpire = this.createTime; 855 this.forceFlushAllStores = forceFlushAllStores; 856 this.tracker = tracker; 857 } 858 859 /** 860 * @param maximumWait 861 * @return True if we have been delayed > <code>maximumWait</code> milliseconds. 862 */ 863 public boolean isMaximumWait(final long maximumWait) { 864 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; 865 } 866 867 /** 868 * @return Count of times {@link #requeue(long)} was called; i.e this is 869 * number of times we've been requeued. 870 */ 871 public int getRequeueCount() { 872 return this.requeueCount; 873 } 874 875 /** 876 * @return whether we need to flush all stores. 877 */ 878 public boolean isForceFlushAllStores() { 879 return forceFlushAllStores; 880 } 881 882 public FlushLifeCycleTracker getTracker() { 883 return tracker; 884 } 885 886 /** 887 * @param when When to expire, when to come up out of the queue. 888 * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime() 889 * to whatever you pass. 890 * @return This. 891 */ 892 public FlushRegionEntry requeue(final long when) { 893 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when; 894 this.requeueCount++; 895 return this; 896 } 897 898 @Override 899 public long getDelay(TimeUnit unit) { 900 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), 901 TimeUnit.MILLISECONDS); 902 } 903 904 @Override 905 public int compareTo(Delayed other) { 906 // Delay is compared first. If there is a tie, compare region's hash code 907 int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - 908 other.getDelay(TimeUnit.MILLISECONDS)).intValue(); 909 if (ret != 0) { 910 return ret; 911 } 912 FlushQueueEntry otherEntry = (FlushQueueEntry) other; 913 return hashCode() - otherEntry.hashCode(); 914 } 915 916 @Override 917 public String toString() { 918 return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]"; 919 } 920 921 @Override 922 public int hashCode() { 923 int hash = (int) getDelay(TimeUnit.MILLISECONDS); 924 return hash ^ region.hashCode(); 925 } 926 927 @Override 928 public boolean equals(Object obj) { 929 if (this == obj) { 930 return true; 931 } 932 if (obj == null || getClass() != obj.getClass()) { 933 return false; 934 } 935 FlushRegionEntry other = (FlushRegionEntry) obj; 936 if (!Bytes.equals(this.region.getRegionInfo().getRegionName(), 937 other.region.getRegionInfo().getRegionName())) { 938 return false; 939 } 940 return compareTo(other) == 0; 941 } 942 } 943}