001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.lang.Thread.UncaughtExceptionHandler; 023import java.util.ArrayList; 024import java.util.ConcurrentModificationException; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.SortedMap; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.DelayQueue; 033import java.util.concurrent.Delayed; 034import java.util.concurrent.ThreadFactory; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.LongAdder; 038import java.util.concurrent.locks.ReentrantReadWriteLock; 039 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.DroppedSnapshotException; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.client.RegionReplicaUtil; 044import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 045import org.apache.hadoop.hbase.trace.TraceUtil; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.ipc.RemoteException; 051import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 052import org.apache.htrace.core.TraceScope; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * Thread that flushes cache on request 059 * 060 * NOTE: This class extends Thread rather than Chore because the sleep time 061 * can be interrupted when there is something to do, rather than the Chore 062 * sleep time which is invariant. 063 * 064 * @see FlushRequester 065 */ 066@InterfaceAudience.Private 067class MemStoreFlusher implements FlushRequester { 068 private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class); 069 070 private Configuration conf; 071 // These two data members go together. Any entry in the one must have 072 // a corresponding entry in the other. 073 private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>(); 074 private final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>(); 075 private AtomicBoolean wakeupPending = new AtomicBoolean(); 076 077 private final long threadWakeFrequency; 078 private final HRegionServer server; 079 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 080 private final Object blockSignal = new Object(); 081 082 private long blockingWaitTime; 083 private final LongAdder updatesBlockedMsHighWater = new LongAdder(); 084 085 private final FlushHandler[] flushHandlers; 086 private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1); 087 088 /** 089 * Singleton instance inserted into flush queue used for signaling. 090 */ 091 private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() { 092 @Override 093 public long getDelay(TimeUnit unit) { 094 return 0; 095 } 096 097 @Override 098 public int compareTo(Delayed o) { 099 return -1; 100 } 101 102 @Override 103 public boolean equals(Object obj) { 104 return obj == this; 105 } 106 107 @Override 108 public int hashCode() { 109 return 42; 110 } 111 }; 112 113 114 /** 115 * @param conf 116 * @param server 117 */ 118 public MemStoreFlusher(final Configuration conf, 119 final HRegionServer server) { 120 super(); 121 this.conf = conf; 122 this.server = server; 123 this.threadWakeFrequency = 124 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 125 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 126 90000); 127 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); 128 if (handlerCount < 1) { 129 LOG.warn("hbase.hstore.flusher.count was configed to {} which is less than 1, corrected to 1", 130 handlerCount); 131 handlerCount = 1; 132 } 133 this.flushHandlers = new FlushHandler[handlerCount]; 134 LOG.info("globalMemStoreLimit=" 135 + TraditionalBinaryPrefix 136 .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) 137 + ", globalMemStoreLimitLowMark=" 138 + TraditionalBinaryPrefix.long2String( 139 this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) 140 + ", Offheap=" 141 + (this.server.getRegionServerAccounting().isOffheap())); 142 } 143 144 public LongAdder getUpdatesBlockedMsHighWater() { 145 return this.updatesBlockedMsHighWater; 146 } 147 148 /** 149 * The memstore across all regions has exceeded the low water mark. Pick 150 * one region to flush and flush it synchronously (this is called from the 151 * flush thread) 152 * @return true if successful 153 */ 154 private boolean flushOneForGlobalPressure(FlushType flushType) { 155 SortedMap<Long, HRegion> regionsBySize = null; 156 switch(flushType) { 157 case ABOVE_OFFHEAP_HIGHER_MARK: 158 case ABOVE_OFFHEAP_LOWER_MARK: 159 regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize(); 160 break; 161 case ABOVE_ONHEAP_HIGHER_MARK: 162 case ABOVE_ONHEAP_LOWER_MARK: 163 default: 164 regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize(); 165 } 166 Set<HRegion> excludedRegions = new HashSet<>(); 167 168 double secondaryMultiplier 169 = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf); 170 171 boolean flushedOne = false; 172 while (!flushedOne) { 173 // Find the biggest region that doesn't have too many storefiles (might be null!) 174 HRegion bestFlushableRegion = 175 getBiggestMemStoreRegion(regionsBySize, excludedRegions, true); 176 // Find the biggest region, total, even if it might have too many flushes. 177 HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false); 178 // Find the biggest region that is a secondary region 179 HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions); 180 if (bestAnyRegion == null) { 181 // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null 182 bestAnyRegion = bestRegionReplica; 183 } 184 if (bestAnyRegion == null) { 185 LOG.error("Above memory mark but there are no flushable regions!"); 186 return false; 187 } 188 189 HRegion regionToFlush; 190 long bestAnyRegionSize; 191 long bestFlushableRegionSize; 192 switch(flushType) { 193 case ABOVE_OFFHEAP_HIGHER_MARK: 194 case ABOVE_OFFHEAP_LOWER_MARK: 195 bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize(); 196 bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion); 197 break; 198 199 case ABOVE_ONHEAP_HIGHER_MARK: 200 case ABOVE_ONHEAP_LOWER_MARK: 201 bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize(); 202 bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion); 203 break; 204 205 default: 206 bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize(); 207 bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion); 208 } 209 if (bestAnyRegionSize > 2 * bestFlushableRegionSize) { 210 // Even if it's not supposed to be flushed, pick a region if it's more than twice 211 // as big as the best flushable one - otherwise when we're under pressure we make 212 // lots of little flushes and cause lots of compactions, etc, which just makes 213 // life worse! 214 if (LOG.isDebugEnabled()) { 215 LOG.debug("Under global heap pressure: " + "Region " 216 + bestAnyRegion.getRegionInfo().getRegionNameAsString() 217 + " has too many " + "store files, but is " 218 + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1) 219 + " vs best flushable region's " 220 + TraditionalBinaryPrefix.long2String( 221 bestFlushableRegionSize, "", 1) 222 + ". Choosing the bigger."); 223 } 224 regionToFlush = bestAnyRegion; 225 } else { 226 if (bestFlushableRegion == null) { 227 regionToFlush = bestAnyRegion; 228 } else { 229 regionToFlush = bestFlushableRegion; 230 } 231 } 232 233 long regionToFlushSize; 234 long bestRegionReplicaSize; 235 switch(flushType) { 236 case ABOVE_OFFHEAP_HIGHER_MARK: 237 case ABOVE_OFFHEAP_LOWER_MARK: 238 regionToFlushSize = regionToFlush.getMemStoreOffHeapSize(); 239 bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica); 240 break; 241 242 case ABOVE_ONHEAP_HIGHER_MARK: 243 case ABOVE_ONHEAP_LOWER_MARK: 244 regionToFlushSize = regionToFlush.getMemStoreHeapSize(); 245 bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica); 246 break; 247 248 default: 249 regionToFlushSize = regionToFlush.getMemStoreDataSize(); 250 bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica); 251 } 252 253 if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) { 254 // A concurrency issue (such as splitting region) may happen such that the online region 255 // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to 256 // getBiggestMemStoreRegion(). This means that we can come out of the loop 257 LOG.debug("Above memory mark but there is no flushable region"); 258 return false; 259 } 260 261 if (regionToFlush == null || 262 (bestRegionReplica != null && 263 ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) && 264 (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))) { 265 LOG.info("Refreshing storefiles of region " + bestRegionReplica + 266 " due to global heap pressure. Total memstore off heap size=" + 267 TraditionalBinaryPrefix.long2String( 268 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) + 269 " memstore heap size=" + TraditionalBinaryPrefix.long2String( 270 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)); 271 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); 272 if (!flushedOne) { 273 LOG.info("Excluding secondary region " + bestRegionReplica + 274 " - trying to find a different region to refresh files."); 275 excludedRegions.add(bestRegionReplica); 276 } 277 } else { 278 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " + 279 "Flush type=" + flushType.toString() + 280 ", Total Memstore Heap size=" + 281 TraditionalBinaryPrefix.long2String( 282 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) + 283 ", Total Memstore Off-Heap size=" + 284 TraditionalBinaryPrefix.long2String( 285 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) + 286 ", Region memstore size=" + 287 TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1)); 288 flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY); 289 290 if (!flushedOne) { 291 LOG.info("Excluding unflushable region " + regionToFlush + 292 " - trying to find a different region to flush."); 293 excludedRegions.add(regionToFlush); 294 } 295 } 296 } 297 return true; 298 } 299 300 /** 301 * @return Return memstore offheap size or null if <code>r</code> is null 302 */ 303 private static long getMemStoreOffHeapSize(HRegion r) { 304 return r == null? 0: r.getMemStoreOffHeapSize(); 305 } 306 307 /** 308 * @return Return memstore heap size or null if <code>r</code> is null 309 */ 310 private static long getMemStoreHeapSize(HRegion r) { 311 return r == null? 0: r.getMemStoreHeapSize(); 312 } 313 314 /** 315 * @return Return memstore data size or null if <code>r</code> is null 316 */ 317 private static long getMemStoreDataSize(HRegion r) { 318 return r == null? 0: r.getMemStoreDataSize(); 319 } 320 321 private class FlushHandler extends Thread { 322 323 private FlushHandler(String name) { 324 super(name); 325 } 326 327 @Override 328 public void run() { 329 while (!server.isStopped()) { 330 FlushQueueEntry fqe = null; 331 try { 332 wakeupPending.set(false); // allow someone to wake us up again 333 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); 334 if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) { 335 FlushType type = isAboveLowWaterMark(); 336 if (type != FlushType.NORMAL) { 337 LOG.debug("Flush thread woke up because memory above low water=" 338 + TraditionalBinaryPrefix.long2String( 339 server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)); 340 // For offheap memstore, even if the lower water mark was breached due to heap overhead 341 // we still select the regions based on the region's memstore data size. 342 // TODO : If we want to decide based on heap over head it can be done without tracking 343 // it per region. 344 if (!flushOneForGlobalPressure(type)) { 345 // Wasn't able to flush any region, but we're above low water mark 346 // This is unlikely to happen, but might happen when closing the 347 // entire server - another thread is flushing regions. We'll just 348 // sleep a little bit to avoid spinning, and then pretend that 349 // we flushed one, so anyone blocked will check again 350 Thread.sleep(1000); 351 wakeUpIfBlocking(); 352 } 353 // Enqueue another one of these tokens so we'll wake up again 354 wakeupFlushThread(); 355 } 356 continue; 357 } 358 FlushRegionEntry fre = (FlushRegionEntry) fqe; 359 if (!flushRegion(fre)) { 360 break; 361 } 362 } catch (InterruptedException ex) { 363 continue; 364 } catch (ConcurrentModificationException ex) { 365 continue; 366 } catch (Exception ex) { 367 LOG.error("Cache flusher failed for entry " + fqe, ex); 368 if (!server.checkFileSystem()) { 369 break; 370 } 371 } 372 } 373 synchronized (regionsInQueue) { 374 regionsInQueue.clear(); 375 flushQueue.clear(); 376 } 377 378 // Signal anyone waiting, so they see the close flag 379 wakeUpIfBlocking(); 380 LOG.info(getName() + " exiting"); 381 } 382 } 383 384 385 private void wakeupFlushThread() { 386 if (wakeupPending.compareAndSet(false, true)) { 387 flushQueue.add(WAKEUPFLUSH_INSTANCE); 388 } 389 } 390 391 private HRegion getBiggestMemStoreRegion( 392 SortedMap<Long, HRegion> regionsBySize, 393 Set<HRegion> excludedRegions, 394 boolean checkStoreFileCount) { 395 synchronized (regionsInQueue) { 396 for (HRegion region : regionsBySize.values()) { 397 if (excludedRegions.contains(region)) { 398 continue; 399 } 400 401 if (region.writestate.flushing || !region.writestate.writesEnabled) { 402 continue; 403 } 404 405 if (checkStoreFileCount && isTooManyStoreFiles(region)) { 406 continue; 407 } 408 return region; 409 } 410 } 411 return null; 412 } 413 414 private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize, 415 Set<HRegion> excludedRegions) { 416 synchronized (regionsInQueue) { 417 for (HRegion region : regionsBySize.values()) { 418 if (excludedRegions.contains(region)) { 419 continue; 420 } 421 422 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 423 continue; 424 } 425 426 return region; 427 } 428 } 429 return null; 430 } 431 432 private boolean refreshStoreFilesAndReclaimMemory(Region region) { 433 try { 434 return region.refreshStoreFiles(); 435 } catch (IOException e) { 436 LOG.warn("Refreshing store files failed with exception", e); 437 } 438 return false; 439 } 440 441 /** 442 * Return true if global memory usage is above the high watermark 443 */ 444 private FlushType isAboveHighWaterMark() { 445 return server.getRegionServerAccounting().isAboveHighWaterMark(); 446 } 447 448 /** 449 * Return true if we're above the low watermark 450 */ 451 private FlushType isAboveLowWaterMark() { 452 return server.getRegionServerAccounting().isAboveLowWaterMark(); 453 } 454 455 @Override 456 public boolean requestFlush(HRegion r, boolean forceFlushAllStores, 457 FlushLifeCycleTracker tracker) { 458 synchronized (regionsInQueue) { 459 if (!regionsInQueue.containsKey(r)) { 460 // This entry has no delay so it will be added at the top of the flush 461 // queue. It'll come out near immediately. 462 FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker); 463 this.regionsInQueue.put(r, fqe); 464 this.flushQueue.add(fqe); 465 r.incrementFlushesQueuedCount(); 466 return true; 467 } else { 468 tracker.notExecuted("Flush already requested on " + r); 469 return false; 470 } 471 } 472 } 473 474 @Override 475 public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) { 476 synchronized (regionsInQueue) { 477 if (!regionsInQueue.containsKey(r)) { 478 // This entry has some delay 479 FlushRegionEntry fqe = 480 new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY); 481 fqe.requeue(delay); 482 this.regionsInQueue.put(r, fqe); 483 this.flushQueue.add(fqe); 484 r.incrementFlushesQueuedCount(); 485 return true; 486 } 487 return false; 488 } 489 } 490 491 public int getFlushQueueSize() { 492 return flushQueue.size(); 493 } 494 495 /** 496 * Only interrupt once it's done with a run through the work loop. 497 */ 498 void interruptIfNecessary() { 499 lock.writeLock().lock(); 500 try { 501 for (FlushHandler flushHander : flushHandlers) { 502 if (flushHander != null) flushHander.interrupt(); 503 } 504 } finally { 505 lock.writeLock().unlock(); 506 } 507 } 508 509 synchronized void start(UncaughtExceptionHandler eh) { 510 ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory( 511 server.getServerName().toShortString() + "-MemStoreFlusher", eh); 512 for (int i = 0; i < flushHandlers.length; i++) { 513 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i); 514 flusherThreadFactory.newThread(flushHandlers[i]); 515 flushHandlers[i].start(); 516 } 517 } 518 519 boolean isAlive() { 520 for (FlushHandler flushHander : flushHandlers) { 521 if (flushHander != null && flushHander.isAlive()) { 522 return true; 523 } 524 } 525 return false; 526 } 527 528 void join() { 529 for (FlushHandler flushHander : flushHandlers) { 530 if (flushHander != null) { 531 Threads.shutdown(flushHander); 532 } 533 } 534 } 535 536 /** 537 * A flushRegion that checks store file count. If too many, puts the flush 538 * on delay queue to retry later. 539 * @param fqe 540 * @return true if the region was successfully flushed, false otherwise. If 541 * false, there will be accompanying log messages explaining why the region was 542 * not flushed. 543 */ 544 private boolean flushRegion(final FlushRegionEntry fqe) { 545 HRegion region = fqe.region; 546 if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { 547 if (fqe.isMaximumWait(this.blockingWaitTime)) { 548 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) + 549 "ms on a compaction to clean up 'too many store files'; waited " + 550 "long enough... proceeding with flush of " + 551 region.getRegionInfo().getRegionNameAsString()); 552 } else { 553 // If this is first time we've been put off, then emit a log message. 554 if (fqe.getRequeueCount() <= 0) { 555 // Note: We don't impose blockingStoreFiles constraint on meta regions 556 LOG.warn("{} has too many store files({}); delaying flush up to {} ms", 557 region.getRegionInfo().getEncodedName(), getStoreFileCount(region), 558 this.blockingWaitTime); 559 if (!this.server.compactSplitThread.requestSplit(region)) { 560 try { 561 this.server.compactSplitThread.requestSystemCompaction(region, 562 Thread.currentThread().getName()); 563 } catch (IOException e) { 564 e = e instanceof RemoteException ? 565 ((RemoteException)e).unwrapRemoteException() : e; 566 LOG.error("Cache flush failed for region " + 567 Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e); 568 } 569 } 570 } 571 572 // Put back on the queue. Have it come back out of the queue 573 // after a delay of this.blockingWaitTime / 100 ms. 574 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); 575 // Tell a lie, it's not flushed but it's ok 576 return true; 577 } 578 } 579 return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker()); 580 } 581 582 /** 583 * Flush a region. 584 * @param region Region to flush. 585 * @param emergencyFlush Set if we are being force flushed. If true the region 586 * needs to be removed from the flush queue. If false, when we were called 587 * from the main flusher run loop and we got the entry to flush by calling 588 * poll on the flush queue (which removed it). 589 * @param forceFlushAllStores whether we want to flush all store. 590 * @return true if the region was successfully flushed, false otherwise. If 591 * false, there will be accompanying log messages explaining why the region was 592 * not flushed. 593 */ 594 private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores, 595 FlushLifeCycleTracker tracker) { 596 synchronized (this.regionsInQueue) { 597 FlushRegionEntry fqe = this.regionsInQueue.remove(region); 598 // Use the start time of the FlushRegionEntry if available 599 if (fqe != null && emergencyFlush) { 600 // Need to remove from region from delay queue. When NOT an 601 // emergencyFlush, then item was removed via a flushQueue.poll. 602 flushQueue.remove(fqe); 603 } 604 } 605 606 tracker.beforeExecution(); 607 lock.readLock().lock(); 608 try { 609 notifyFlushRequest(region, emergencyFlush); 610 FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker); 611 boolean shouldCompact = flushResult.isCompactionNeeded(); 612 // We just want to check the size 613 boolean shouldSplit = region.checkSplit() != null; 614 if (shouldSplit) { 615 this.server.compactSplitThread.requestSplit(region); 616 } else if (shouldCompact) { 617 server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 618 } 619 } catch (DroppedSnapshotException ex) { 620 // Cache flush can fail in a few places. If it fails in a critical 621 // section, we get a DroppedSnapshotException and a replay of wal 622 // is required. Currently the only way to do this is a restart of 623 // the server. Abort because hdfs is probably bad (HBASE-644 is a case 624 // where hdfs was bad but passed the hdfs check). 625 server.abort("Replay of WAL required. Forcing server shutdown", ex); 626 return false; 627 } catch (IOException ex) { 628 ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 629 LOG.error( 630 "Cache flush failed" 631 + (region != null ? (" for region " + 632 Bytes.toStringBinary(region.getRegionInfo().getRegionName())) 633 : ""), ex); 634 if (!server.checkFileSystem()) { 635 return false; 636 } 637 } finally { 638 lock.readLock().unlock(); 639 wakeUpIfBlocking(); 640 tracker.afterExecution(); 641 } 642 return true; 643 } 644 645 private void notifyFlushRequest(Region region, boolean emergencyFlush) { 646 FlushType type = null; 647 if (emergencyFlush) { 648 type = isAboveHighWaterMark(); 649 if (type == null) { 650 type = isAboveLowWaterMark(); 651 } 652 } 653 for (FlushRequestListener listener : flushRequestListeners) { 654 listener.flushRequested(type, region); 655 } 656 } 657 658 private void wakeUpIfBlocking() { 659 synchronized (blockSignal) { 660 blockSignal.notifyAll(); 661 } 662 } 663 664 private boolean isTooManyStoreFiles(Region region) { 665 666 // When compaction is disabled, the region is flushable 667 if (!region.getTableDescriptor().isCompactionEnabled()) { 668 return false; 669 } 670 671 for (Store store : region.getStores()) { 672 if (store.hasTooManyStoreFiles()) { 673 return true; 674 } 675 } 676 return false; 677 } 678 679 private int getStoreFileCount(Region region) { 680 int count = 0; 681 for (Store store : region.getStores()) { 682 count += store.getStorefilesCount(); 683 } 684 return count; 685 } 686 687 /** 688 * Check if the regionserver's memstore memory usage is greater than the 689 * limit. If so, flush regions with the biggest memstores until we're down 690 * to the lower limit. This method blocks callers until we're down to a safe 691 * amount of memstore consumption. 692 */ 693 public void reclaimMemStoreMemory() { 694 try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) { 695 FlushType flushType = isAboveHighWaterMark(); 696 if (flushType != FlushType.NORMAL) { 697 TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark."); 698 long start = EnvironmentEdgeManager.currentTime(); 699 long nextLogTimeMs = start; 700 synchronized (this.blockSignal) { 701 boolean blocked = false; 702 long startTime = 0; 703 boolean interrupted = false; 704 try { 705 flushType = isAboveHighWaterMark(); 706 while (flushType != FlushType.NORMAL && !server.isStopped()) { 707 if (!blocked) { 708 startTime = EnvironmentEdgeManager.currentTime(); 709 if (!server.getRegionServerAccounting().isOffheap()) { 710 logMsg("global memstore heapsize", 711 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 712 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 713 } else { 714 switch (flushType) { 715 case ABOVE_OFFHEAP_HIGHER_MARK: 716 logMsg("the global offheap memstore datasize", 717 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), 718 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 719 break; 720 case ABOVE_ONHEAP_HIGHER_MARK: 721 logMsg("global memstore heapsize", 722 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 723 server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit()); 724 break; 725 default: 726 break; 727 } 728 } 729 } 730 blocked = true; 731 wakeupFlushThread(); 732 try { 733 // we should be able to wait forever, but we've seen a bug where 734 // we miss a notify, so put a 5 second bound on it at least. 735 blockSignal.wait(5 * 1000); 736 } catch (InterruptedException ie) { 737 LOG.warn("Interrupted while waiting"); 738 interrupted = true; 739 } 740 long nowMs = EnvironmentEdgeManager.currentTime(); 741 if (nowMs >= nextLogTimeMs) { 742 LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start); 743 nextLogTimeMs = nowMs + 1000; 744 } 745 flushType = isAboveHighWaterMark(); 746 } 747 } finally { 748 if (interrupted) { 749 Thread.currentThread().interrupt(); 750 } 751 } 752 753 if(blocked){ 754 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; 755 if(totalTime > 0){ 756 this.updatesBlockedMsHighWater.add(totalTime); 757 } 758 LOG.info("Unblocking updates for server " + server.toString()); 759 } 760 } 761 } else { 762 flushType = isAboveLowWaterMark(); 763 if (flushType != FlushType.NORMAL) { 764 wakeupFlushThread(); 765 } 766 } 767 } 768 } 769 770 private void logMsg(String type, long val, long max) { 771 LOG.info("Blocking updates: {} {} is >= blocking {}", type, 772 TraditionalBinaryPrefix.long2String(val, "", 1), 773 TraditionalBinaryPrefix.long2String(max, "", 1)); 774 } 775 776 @Override 777 public String toString() { 778 return "flush_queue=" 779 + flushQueue.size(); 780 } 781 782 public String dumpQueue() { 783 StringBuilder queueList = new StringBuilder(); 784 queueList.append("Flush Queue Queue dump:\n"); 785 queueList.append(" Flush Queue:\n"); 786 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator(); 787 788 while(it.hasNext()){ 789 queueList.append(" "+it.next().toString()); 790 queueList.append("\n"); 791 } 792 793 return queueList.toString(); 794 } 795 796 /** 797 * Register a MemstoreFlushListener 798 * @param listener 799 */ 800 @Override 801 public void registerFlushRequestListener(final FlushRequestListener listener) { 802 this.flushRequestListeners.add(listener); 803 } 804 805 /** 806 * Unregister the listener from MemstoreFlushListeners 807 * @param listener 808 * @return true when passed listener is unregistered successfully. 809 */ 810 @Override 811 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) { 812 return this.flushRequestListeners.remove(listener); 813 } 814 815 /** 816 * Sets the global memstore limit to a new size. 817 * @param globalMemStoreSize 818 */ 819 @Override 820 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 821 this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize); 822 reclaimMemStoreMemory(); 823 } 824 825 interface FlushQueueEntry extends Delayed { 826 } 827 828 /** 829 * Datastructure used in the flush queue. Holds region and retry count. 830 * Keeps tabs on how old this object is. Implements {@link Delayed}. On 831 * construction, the delay is zero. When added to a delay queue, we'll come 832 * out near immediately. Call {@link #requeue(long)} passing delay in 833 * milliseconds before readding to delay queue if you want it to stay there 834 * a while. 835 */ 836 static class FlushRegionEntry implements FlushQueueEntry { 837 private final HRegion region; 838 839 private final long createTime; 840 private long whenToExpire; 841 private int requeueCount = 0; 842 843 private final boolean forceFlushAllStores; 844 845 private final FlushLifeCycleTracker tracker; 846 847 FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) { 848 this.region = r; 849 this.createTime = EnvironmentEdgeManager.currentTime(); 850 this.whenToExpire = this.createTime; 851 this.forceFlushAllStores = forceFlushAllStores; 852 this.tracker = tracker; 853 } 854 855 /** 856 * @param maximumWait 857 * @return True if we have been delayed > <code>maximumWait</code> milliseconds. 858 */ 859 public boolean isMaximumWait(final long maximumWait) { 860 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; 861 } 862 863 /** 864 * @return Count of times {@link #requeue(long)} was called; i.e this is 865 * number of times we've been requeued. 866 */ 867 public int getRequeueCount() { 868 return this.requeueCount; 869 } 870 871 /** 872 * @return whether we need to flush all stores. 873 */ 874 public boolean isForceFlushAllStores() { 875 return forceFlushAllStores; 876 } 877 878 public FlushLifeCycleTracker getTracker() { 879 return tracker; 880 } 881 882 /** 883 * @param when When to expire, when to come up out of the queue. 884 * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime() 885 * to whatever you pass. 886 * @return This. 887 */ 888 public FlushRegionEntry requeue(final long when) { 889 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when; 890 this.requeueCount++; 891 return this; 892 } 893 894 @Override 895 public long getDelay(TimeUnit unit) { 896 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), 897 TimeUnit.MILLISECONDS); 898 } 899 900 @Override 901 public int compareTo(Delayed other) { 902 // Delay is compared first. If there is a tie, compare region's hash code 903 int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - 904 other.getDelay(TimeUnit.MILLISECONDS)).intValue(); 905 if (ret != 0) { 906 return ret; 907 } 908 FlushQueueEntry otherEntry = (FlushQueueEntry) other; 909 return hashCode() - otherEntry.hashCode(); 910 } 911 912 @Override 913 public String toString() { 914 return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]"; 915 } 916 917 @Override 918 public int hashCode() { 919 int hash = (int) getDelay(TimeUnit.MILLISECONDS); 920 return hash ^ region.hashCode(); 921 } 922 923 @Override 924 public boolean equals(Object obj) { 925 if (this == obj) { 926 return true; 927 } 928 if (obj == null || getClass() != obj.getClass()) { 929 return false; 930 } 931 FlushRegionEntry other = (FlushRegionEntry) obj; 932 if (!Bytes.equals(this.region.getRegionInfo().getRegionName(), 933 other.region.getRegionInfo().getRegionName())) { 934 return false; 935 } 936 return compareTo(other) == 0; 937 } 938 } 939}