001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.lang.Thread.UncaughtExceptionHandler; 023import java.util.ArrayList; 024import java.util.ConcurrentModificationException; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.SortedMap; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.DelayQueue; 033import java.util.concurrent.Delayed; 034import java.util.concurrent.ThreadFactory; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.LongAdder; 038import java.util.concurrent.locks.ReentrantReadWriteLock; 039 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.DroppedSnapshotException; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.client.RegionReplicaUtil; 044import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 045import org.apache.hadoop.hbase.trace.TraceUtil; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 049import org.apache.hadoop.hbase.util.Threads; 050import org.apache.hadoop.ipc.RemoteException; 051import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 053import org.apache.htrace.core.TraceScope; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * Thread that flushes cache on request 060 * 061 * NOTE: This class extends Thread rather than Chore because the sleep time 062 * can be interrupted when there is something to do, rather than the Chore 063 * sleep time which is invariant. 064 * 065 * @see FlushRequester 066 */ 067@InterfaceAudience.Private 068class MemStoreFlusher implements FlushRequester { 069 private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class); 070 071 private Configuration conf; 072 // These two data members go together. Any entry in the one must have 073 // a corresponding entry in the other. 074 private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>(); 075 private final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>(); 076 private AtomicBoolean wakeupPending = new AtomicBoolean(); 077 078 private final long threadWakeFrequency; 079 private final HRegionServer server; 080 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 081 private final Object blockSignal = new Object(); 082 083 private long blockingWaitTime; 084 private final LongAdder updatesBlockedMsHighWater = new LongAdder(); 085 086 private final FlushHandler[] flushHandlers; 087 private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1); 088 089 /** 090 * Singleton instance inserted into flush queue used for signaling. 091 */ 092 private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() { 093 @Override 094 public long getDelay(TimeUnit unit) { 095 return 0; 096 } 097 098 @Override 099 public int compareTo(Delayed o) { 100 return -1; 101 } 102 103 @Override 104 public boolean equals(Object obj) { 105 return obj == this; 106 } 107 108 @Override 109 public int hashCode() { 110 return 42; 111 } 112 }; 113 114 115 /** 116 * @param conf 117 * @param server 118 */ 119 public MemStoreFlusher(final Configuration conf, 120 final HRegionServer server) { 121 super(); 122 this.conf = conf; 123 this.server = server; 124 this.threadWakeFrequency = 125 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 126 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 127 90000); 128 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); 129 if (handlerCount < 1) { 130 LOG.warn("hbase.hstore.flusher.count was configed to {} which is less than 1, corrected to 1", 131 handlerCount); 132 handlerCount = 1; 133 } 134 this.flushHandlers = new FlushHandler[handlerCount]; 135 LOG.info("globalMemStoreLimit=" 136 + TraditionalBinaryPrefix 137 .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) 138 + ", globalMemStoreLimitLowMark=" 139 + TraditionalBinaryPrefix.long2String( 140 this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) 141 + ", Offheap=" 142 + (this.server.getRegionServerAccounting().isOffheap())); 143 } 144 145 public LongAdder getUpdatesBlockedMsHighWater() { 146 return this.updatesBlockedMsHighWater; 147 } 148 149 /** 150 * The memstore across all regions has exceeded the low water mark. Pick 151 * one region to flush and flush it synchronously (this is called from the 152 * flush thread) 153 * @return true if successful 154 */ 155 private boolean flushOneForGlobalPressure(FlushType flushType) { 156 SortedMap<Long, HRegion> regionsBySize = null; 157 switch(flushType) { 158 case ABOVE_OFFHEAP_HIGHER_MARK: 159 case ABOVE_OFFHEAP_LOWER_MARK: 160 regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize(); 161 break; 162 case ABOVE_ONHEAP_HIGHER_MARK: 163 case ABOVE_ONHEAP_LOWER_MARK: 164 default: 165 regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize(); 166 } 167 Set<HRegion> excludedRegions = new HashSet<>(); 168 169 double secondaryMultiplier 170 = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf); 171 172 boolean flushedOne = false; 173 while (!flushedOne) { 174 // Find the biggest region that doesn't have too many storefiles (might be null!) 175 HRegion bestFlushableRegion = 176 getBiggestMemStoreRegion(regionsBySize, excludedRegions, true); 177 // Find the biggest region, total, even if it might have too many flushes. 178 HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false); 179 // Find the biggest region that is a secondary region 180 HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions); 181 if (bestAnyRegion == null) { 182 // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null 183 bestAnyRegion = bestRegionReplica; 184 } 185 if (bestAnyRegion == null) { 186 LOG.error("Above memory mark but there are no flushable regions!"); 187 return false; 188 } 189 190 HRegion regionToFlush; 191 long bestAnyRegionSize; 192 long bestFlushableRegionSize; 193 switch(flushType) { 194 case ABOVE_OFFHEAP_HIGHER_MARK: 195 case ABOVE_OFFHEAP_LOWER_MARK: 196 bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize(); 197 bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion); 198 break; 199 200 case ABOVE_ONHEAP_HIGHER_MARK: 201 case ABOVE_ONHEAP_LOWER_MARK: 202 bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize(); 203 bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion); 204 break; 205 206 default: 207 bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize(); 208 bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion); 209 } 210 if (bestAnyRegionSize > 2 * bestFlushableRegionSize) { 211 // Even if it's not supposed to be flushed, pick a region if it's more than twice 212 // as big as the best flushable one - otherwise when we're under pressure we make 213 // lots of little flushes and cause lots of compactions, etc, which just makes 214 // life worse! 215 if (LOG.isDebugEnabled()) { 216 LOG.debug("Under global heap pressure: " + "Region " 217 + bestAnyRegion.getRegionInfo().getRegionNameAsString() 218 + " has too many " + "store files, but is " 219 + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1) 220 + " vs best flushable region's " 221 + TraditionalBinaryPrefix.long2String( 222 bestFlushableRegionSize, "", 1) 223 + ". Choosing the bigger."); 224 } 225 regionToFlush = bestAnyRegion; 226 } else { 227 if (bestFlushableRegion == null) { 228 regionToFlush = bestAnyRegion; 229 } else { 230 regionToFlush = bestFlushableRegion; 231 } 232 } 233 234 long regionToFlushSize; 235 long bestRegionReplicaSize; 236 switch(flushType) { 237 case ABOVE_OFFHEAP_HIGHER_MARK: 238 case ABOVE_OFFHEAP_LOWER_MARK: 239 regionToFlushSize = regionToFlush.getMemStoreOffHeapSize(); 240 bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica); 241 break; 242 243 case ABOVE_ONHEAP_HIGHER_MARK: 244 case ABOVE_ONHEAP_LOWER_MARK: 245 regionToFlushSize = regionToFlush.getMemStoreHeapSize(); 246 bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica); 247 break; 248 249 default: 250 regionToFlushSize = regionToFlush.getMemStoreDataSize(); 251 bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica); 252 } 253 254 if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) { 255 // A concurrency issue (such as splitting region) may happen such that the online region 256 // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to 257 // getBiggestMemStoreRegion(). This means that we can come out of the loop 258 LOG.debug("Above memory mark but there is no flushable region"); 259 return false; 260 } 261 262 if (regionToFlush == null || 263 (bestRegionReplica != null && 264 ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) && 265 (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))) { 266 LOG.info("Refreshing storefiles of region " + bestRegionReplica + 267 " due to global heap pressure. Total memstore off heap size=" + 268 TraditionalBinaryPrefix.long2String( 269 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) + 270 " memstore heap size=" + TraditionalBinaryPrefix.long2String( 271 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)); 272 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); 273 if (!flushedOne) { 274 LOG.info("Excluding secondary region " + bestRegionReplica + 275 " - trying to find a different region to refresh files."); 276 excludedRegions.add(bestRegionReplica); 277 } 278 } else { 279 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " + 280 "Flush type=" + flushType.toString() + 281 ", Total Memstore Heap size=" + 282 TraditionalBinaryPrefix.long2String( 283 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) + 284 ", Total Memstore Off-Heap size=" + 285 TraditionalBinaryPrefix.long2String( 286 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) + 287 ", Region memstore size=" + 288 TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1)); 289 flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY); 290 291 if (!flushedOne) { 292 LOG.info("Excluding unflushable region " + regionToFlush + 293 " - trying to find a different region to flush."); 294 excludedRegions.add(regionToFlush); 295 } 296 } 297 } 298 return true; 299 } 300 301 /** 302 * @return Return memstore offheap size or null if <code>r</code> is null 303 */ 304 private static long getMemStoreOffHeapSize(HRegion r) { 305 return r == null? 0: r.getMemStoreOffHeapSize(); 306 } 307 308 /** 309 * @return Return memstore heap size or null if <code>r</code> is null 310 */ 311 private static long getMemStoreHeapSize(HRegion r) { 312 return r == null? 0: r.getMemStoreHeapSize(); 313 } 314 315 /** 316 * @return Return memstore data size or null if <code>r</code> is null 317 */ 318 private static long getMemStoreDataSize(HRegion r) { 319 return r == null? 0: r.getMemStoreDataSize(); 320 } 321 322 private class FlushHandler extends Thread { 323 324 private FlushHandler(String name) { 325 super(name); 326 } 327 328 @Override 329 public void run() { 330 while (!server.isStopped()) { 331 FlushQueueEntry fqe = null; 332 try { 333 wakeupPending.set(false); // allow someone to wake us up again 334 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); 335 if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) { 336 FlushType type = isAboveLowWaterMark(); 337 if (type != FlushType.NORMAL) { 338 LOG.debug("Flush thread woke up because memory above low water=" 339 + TraditionalBinaryPrefix.long2String( 340 server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)); 341 // For offheap memstore, even if the lower water mark was breached due to heap overhead 342 // we still select the regions based on the region's memstore data size. 343 // TODO : If we want to decide based on heap over head it can be done without tracking 344 // it per region. 345 if (!flushOneForGlobalPressure(type)) { 346 // Wasn't able to flush any region, but we're above low water mark 347 // This is unlikely to happen, but might happen when closing the 348 // entire server - another thread is flushing regions. We'll just 349 // sleep a little bit to avoid spinning, and then pretend that 350 // we flushed one, so anyone blocked will check again 351 Thread.sleep(1000); 352 wakeUpIfBlocking(); 353 } 354 // Enqueue another one of these tokens so we'll wake up again 355 wakeupFlushThread(); 356 } 357 continue; 358 } 359 FlushRegionEntry fre = (FlushRegionEntry) fqe; 360 if (!flushRegion(fre)) { 361 break; 362 } 363 } catch (InterruptedException ex) { 364 continue; 365 } catch (ConcurrentModificationException ex) { 366 continue; 367 } catch (Exception ex) { 368 LOG.error("Cache flusher failed for entry " + fqe, ex); 369 if (!server.checkFileSystem()) { 370 break; 371 } 372 } 373 } 374 synchronized (regionsInQueue) { 375 regionsInQueue.clear(); 376 flushQueue.clear(); 377 } 378 379 // Signal anyone waiting, so they see the close flag 380 wakeUpIfBlocking(); 381 LOG.info(getName() + " exiting"); 382 } 383 } 384 385 386 private void wakeupFlushThread() { 387 if (wakeupPending.compareAndSet(false, true)) { 388 flushQueue.add(WAKEUPFLUSH_INSTANCE); 389 } 390 } 391 392 private HRegion getBiggestMemStoreRegion( 393 SortedMap<Long, HRegion> regionsBySize, 394 Set<HRegion> excludedRegions, 395 boolean checkStoreFileCount) { 396 synchronized (regionsInQueue) { 397 for (HRegion region : regionsBySize.values()) { 398 if (excludedRegions.contains(region)) { 399 continue; 400 } 401 402 if (region.writestate.flushing || !region.writestate.writesEnabled) { 403 continue; 404 } 405 406 if (checkStoreFileCount && isTooManyStoreFiles(region)) { 407 continue; 408 } 409 return region; 410 } 411 } 412 return null; 413 } 414 415 private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize, 416 Set<HRegion> excludedRegions) { 417 synchronized (regionsInQueue) { 418 for (HRegion region : regionsBySize.values()) { 419 if (excludedRegions.contains(region)) { 420 continue; 421 } 422 423 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 424 continue; 425 } 426 427 return region; 428 } 429 } 430 return null; 431 } 432 433 private boolean refreshStoreFilesAndReclaimMemory(Region region) { 434 try { 435 return region.refreshStoreFiles(); 436 } catch (IOException e) { 437 LOG.warn("Refreshing store files failed with exception", e); 438 } 439 return false; 440 } 441 442 /** 443 * Return true if global memory usage is above the high watermark 444 */ 445 private FlushType isAboveHighWaterMark() { 446 return server.getRegionServerAccounting().isAboveHighWaterMark(); 447 } 448 449 /** 450 * Return true if we're above the low watermark 451 */ 452 private FlushType isAboveLowWaterMark() { 453 return server.getRegionServerAccounting().isAboveLowWaterMark(); 454 } 455 456 @Override 457 public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) { 458 return this.requestFlush(r, null, tracker); 459 } 460 461 @Override 462 public boolean requestFlush(HRegion r, List<byte[]> families, 463 FlushLifeCycleTracker tracker) { 464 synchronized (regionsInQueue) { 465 if (!regionsInQueue.containsKey(r)) { 466 // This entry has no delay so it will be added at the top of the flush 467 // queue. It'll come out near immediately. 468 FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker); 469 this.regionsInQueue.put(r, fqe); 470 this.flushQueue.add(fqe); 471 r.incrementFlushesQueuedCount(); 472 return true; 473 } else { 474 tracker.notExecuted("Flush already requested on " + r); 475 return false; 476 } 477 } 478 } 479 480 @Override 481 public boolean requestDelayedFlush(HRegion r, long delay) { 482 synchronized (regionsInQueue) { 483 if (!regionsInQueue.containsKey(r)) { 484 // This entry has some delay 485 FlushRegionEntry fqe = 486 new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY); 487 fqe.requeue(delay); 488 this.regionsInQueue.put(r, fqe); 489 this.flushQueue.add(fqe); 490 r.incrementFlushesQueuedCount(); 491 return true; 492 } 493 return false; 494 } 495 } 496 497 public int getFlushQueueSize() { 498 return flushQueue.size(); 499 } 500 501 /** 502 * Only interrupt once it's done with a run through the work loop. 503 */ 504 void interruptIfNecessary() { 505 lock.writeLock().lock(); 506 try { 507 for (FlushHandler flushHander : flushHandlers) { 508 if (flushHander != null) flushHander.interrupt(); 509 } 510 } finally { 511 lock.writeLock().unlock(); 512 } 513 } 514 515 synchronized void start(UncaughtExceptionHandler eh) { 516 ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder() 517 .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d") 518 .setDaemon(true).setUncaughtExceptionHandler(eh).build(); 519 for (int i = 0; i < flushHandlers.length; i++) { 520 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i); 521 flusherThreadFactory.newThread(flushHandlers[i]); 522 flushHandlers[i].start(); 523 } 524 } 525 526 boolean isAlive() { 527 for (FlushHandler flushHander : flushHandlers) { 528 if (flushHander != null && flushHander.isAlive()) { 529 return true; 530 } 531 } 532 return false; 533 } 534 535 void join() { 536 for (FlushHandler flushHander : flushHandlers) { 537 if (flushHander != null) { 538 Threads.shutdown(flushHander); 539 } 540 } 541 } 542 543 /** 544 * A flushRegion that checks store file count. If too many, puts the flush 545 * on delay queue to retry later. 546 * @param fqe 547 * @return true if the region was successfully flushed, false otherwise. If 548 * false, there will be accompanying log messages explaining why the region was 549 * not flushed. 550 */ 551 private boolean flushRegion(final FlushRegionEntry fqe) { 552 HRegion region = fqe.region; 553 if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { 554 if (fqe.isMaximumWait(this.blockingWaitTime)) { 555 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) + 556 "ms on a compaction to clean up 'too many store files'; waited " + 557 "long enough... proceeding with flush of " + 558 region.getRegionInfo().getRegionNameAsString()); 559 } else { 560 // If this is first time we've been put off, then emit a log message. 561 if (fqe.getRequeueCount() <= 0) { 562 // Note: We don't impose blockingStoreFiles constraint on meta regions 563 LOG.warn("{} has too many store files({}); delaying flush up to {} ms", 564 region.getRegionInfo().getEncodedName(), getStoreFileCount(region), 565 this.blockingWaitTime); 566 if (!this.server.compactSplitThread.requestSplit(region)) { 567 try { 568 this.server.compactSplitThread.requestSystemCompaction(region, 569 Thread.currentThread().getName()); 570 } catch (IOException e) { 571 e = e instanceof RemoteException ? 572 ((RemoteException)e).unwrapRemoteException() : e; 573 LOG.error("Cache flush failed for region " + 574 Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e); 575 } 576 } 577 } 578 579 // Put back on the queue. Have it come back out of the queue 580 // after a delay of this.blockingWaitTime / 100 ms. 581 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); 582 // Tell a lie, it's not flushed but it's ok 583 return true; 584 } 585 } 586 return flushRegion(region, false, fqe.families, fqe.getTracker()); 587 } 588 589 /** 590 * Flush a region. 591 * @param region Region to flush. 592 * @param emergencyFlush Set if we are being force flushed. If true the region 593 * needs to be removed from the flush queue. If false, when we were called 594 * from the main flusher run loop and we got the entry to flush by calling 595 * poll on the flush queue (which removed it). 596 * @param families stores of region to flush. 597 * @return true if the region was successfully flushed, false otherwise. If 598 * false, there will be accompanying log messages explaining why the region was 599 * not flushed. 600 */ 601 private boolean flushRegion(HRegion region, boolean emergencyFlush, 602 List<byte[]> families, FlushLifeCycleTracker tracker) { 603 synchronized (this.regionsInQueue) { 604 FlushRegionEntry fqe = this.regionsInQueue.remove(region); 605 // Use the start time of the FlushRegionEntry if available 606 if (fqe != null && emergencyFlush) { 607 // Need to remove from region from delay queue. When NOT an 608 // emergencyFlush, then item was removed via a flushQueue.poll. 609 flushQueue.remove(fqe); 610 } 611 } 612 613 tracker.beforeExecution(); 614 lock.readLock().lock(); 615 try { 616 notifyFlushRequest(region, emergencyFlush); 617 FlushResult flushResult = region.flushcache(families, false, tracker); 618 boolean shouldCompact = flushResult.isCompactionNeeded(); 619 // We just want to check the size 620 boolean shouldSplit = region.checkSplit().isPresent(); 621 if (shouldSplit) { 622 this.server.compactSplitThread.requestSplit(region); 623 } else if (shouldCompact) { 624 server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 625 } 626 } catch (DroppedSnapshotException ex) { 627 // Cache flush can fail in a few places. If it fails in a critical 628 // section, we get a DroppedSnapshotException and a replay of wal 629 // is required. Currently the only way to do this is a restart of 630 // the server. Abort because hdfs is probably bad (HBASE-644 is a case 631 // where hdfs was bad but passed the hdfs check). 632 server.abort("Replay of WAL required. Forcing server shutdown", ex); 633 return false; 634 } catch (IOException ex) { 635 ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 636 LOG.error( 637 "Cache flush failed" 638 + (region != null ? (" for region " + 639 Bytes.toStringBinary(region.getRegionInfo().getRegionName())) 640 : ""), ex); 641 if (!server.checkFileSystem()) { 642 return false; 643 } 644 } finally { 645 lock.readLock().unlock(); 646 wakeUpIfBlocking(); 647 tracker.afterExecution(); 648 } 649 return true; 650 } 651 652 private void notifyFlushRequest(Region region, boolean emergencyFlush) { 653 FlushType type = null; 654 if (emergencyFlush) { 655 type = isAboveHighWaterMark(); 656 if (type == null) { 657 type = isAboveLowWaterMark(); 658 } 659 } 660 for (FlushRequestListener listener : flushRequestListeners) { 661 listener.flushRequested(type, region); 662 } 663 } 664 665 private void wakeUpIfBlocking() { 666 synchronized (blockSignal) { 667 blockSignal.notifyAll(); 668 } 669 } 670 671 private boolean isTooManyStoreFiles(Region region) { 672 673 // When compaction is disabled, the region is flushable 674 if (!region.getTableDescriptor().isCompactionEnabled()) { 675 return false; 676 } 677 678 for (Store store : region.getStores()) { 679 if (store.hasTooManyStoreFiles()) { 680 return true; 681 } 682 } 683 return false; 684 } 685 686 private int getStoreFileCount(Region region) { 687 int count = 0; 688 for (Store store : region.getStores()) { 689 count += store.getStorefilesCount(); 690 } 691 return count; 692 } 693 694 /** 695 * Check if the regionserver's memstore memory usage is greater than the 696 * limit. If so, flush regions with the biggest memstores until we're down 697 * to the lower limit. This method blocks callers until we're down to a safe 698 * amount of memstore consumption. 699 */ 700 public void reclaimMemStoreMemory() { 701 try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) { 702 FlushType flushType = isAboveHighWaterMark(); 703 if (flushType != FlushType.NORMAL) { 704 TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark."); 705 long start = EnvironmentEdgeManager.currentTime(); 706 long nextLogTimeMs = start; 707 synchronized (this.blockSignal) { 708 boolean blocked = false; 709 long startTime = 0; 710 boolean interrupted = false; 711 try { 712 flushType = isAboveHighWaterMark(); 713 while (flushType != FlushType.NORMAL && !server.isStopped()) { 714 if (!blocked) { 715 startTime = EnvironmentEdgeManager.currentTime(); 716 if (!server.getRegionServerAccounting().isOffheap()) { 717 logMsg("global memstore heapsize", 718 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 719 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 720 } else { 721 switch (flushType) { 722 case ABOVE_OFFHEAP_HIGHER_MARK: 723 logMsg("the global offheap memstore datasize", 724 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), 725 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 726 break; 727 case ABOVE_ONHEAP_HIGHER_MARK: 728 logMsg("global memstore heapsize", 729 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 730 server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit()); 731 break; 732 default: 733 break; 734 } 735 } 736 } 737 blocked = true; 738 wakeupFlushThread(); 739 try { 740 // we should be able to wait forever, but we've seen a bug where 741 // we miss a notify, so put a 5 second bound on it at least. 742 blockSignal.wait(5 * 1000); 743 } catch (InterruptedException ie) { 744 LOG.warn("Interrupted while waiting"); 745 interrupted = true; 746 } 747 long nowMs = EnvironmentEdgeManager.currentTime(); 748 if (nowMs >= nextLogTimeMs) { 749 LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start); 750 nextLogTimeMs = nowMs + 1000; 751 } 752 flushType = isAboveHighWaterMark(); 753 } 754 } finally { 755 if (interrupted) { 756 Thread.currentThread().interrupt(); 757 } 758 } 759 760 if(blocked){ 761 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; 762 if(totalTime > 0){ 763 this.updatesBlockedMsHighWater.add(totalTime); 764 } 765 LOG.info("Unblocking updates for server " + server.toString()); 766 } 767 } 768 } else { 769 flushType = isAboveLowWaterMark(); 770 if (flushType != FlushType.NORMAL) { 771 wakeupFlushThread(); 772 } 773 } 774 } 775 } 776 777 private void logMsg(String type, long val, long max) { 778 LOG.info("Blocking updates: {} {} is >= blocking {}", type, 779 TraditionalBinaryPrefix.long2String(val, "", 1), 780 TraditionalBinaryPrefix.long2String(max, "", 1)); 781 } 782 783 @Override 784 public String toString() { 785 return "flush_queue=" 786 + flushQueue.size(); 787 } 788 789 public String dumpQueue() { 790 StringBuilder queueList = new StringBuilder(); 791 queueList.append("Flush Queue Queue dump:\n"); 792 queueList.append(" Flush Queue:\n"); 793 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator(); 794 795 while(it.hasNext()){ 796 queueList.append(" "+it.next().toString()); 797 queueList.append("\n"); 798 } 799 800 return queueList.toString(); 801 } 802 803 /** 804 * Register a MemstoreFlushListener 805 * @param listener 806 */ 807 @Override 808 public void registerFlushRequestListener(final FlushRequestListener listener) { 809 this.flushRequestListeners.add(listener); 810 } 811 812 /** 813 * Unregister the listener from MemstoreFlushListeners 814 * @param listener 815 * @return true when passed listener is unregistered successfully. 816 */ 817 @Override 818 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) { 819 return this.flushRequestListeners.remove(listener); 820 } 821 822 /** 823 * Sets the global memstore limit to a new size. 824 * @param globalMemStoreSize 825 */ 826 @Override 827 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 828 this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize); 829 reclaimMemStoreMemory(); 830 } 831 832 interface FlushQueueEntry extends Delayed { 833 } 834 835 /** 836 * Datastructure used in the flush queue. Holds region and retry count. 837 * Keeps tabs on how old this object is. Implements {@link Delayed}. On 838 * construction, the delay is zero. When added to a delay queue, we'll come 839 * out near immediately. Call {@link #requeue(long)} passing delay in 840 * milliseconds before readding to delay queue if you want it to stay there 841 * a while. 842 */ 843 static class FlushRegionEntry implements FlushQueueEntry { 844 private final HRegion region; 845 846 private final long createTime; 847 private long whenToExpire; 848 private int requeueCount = 0; 849 850 private final List<byte[]> families; 851 852 private final FlushLifeCycleTracker tracker; 853 854 FlushRegionEntry(final HRegion r, List<byte[]> families, 855 FlushLifeCycleTracker tracker) { 856 this.region = r; 857 this.createTime = EnvironmentEdgeManager.currentTime(); 858 this.whenToExpire = this.createTime; 859 this.families = families; 860 this.tracker = tracker; 861 } 862 863 /** 864 * @param maximumWait 865 * @return True if we have been delayed > <code>maximumWait</code> milliseconds. 866 */ 867 public boolean isMaximumWait(final long maximumWait) { 868 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; 869 } 870 871 /** 872 * @return Count of times {@link #requeue(long)} was called; i.e this is 873 * number of times we've been requeued. 874 */ 875 public int getRequeueCount() { 876 return this.requeueCount; 877 } 878 879 public FlushLifeCycleTracker getTracker() { 880 return tracker; 881 } 882 883 /** 884 * @param when When to expire, when to come up out of the queue. 885 * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime() 886 * to whatever you pass. 887 * @return This. 888 */ 889 public FlushRegionEntry requeue(final long when) { 890 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when; 891 this.requeueCount++; 892 return this; 893 } 894 895 @Override 896 public long getDelay(TimeUnit unit) { 897 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), 898 TimeUnit.MILLISECONDS); 899 } 900 901 @Override 902 public int compareTo(Delayed other) { 903 // Delay is compared first. If there is a tie, compare region's hash code 904 int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - 905 other.getDelay(TimeUnit.MILLISECONDS)).intValue(); 906 if (ret != 0) { 907 return ret; 908 } 909 FlushQueueEntry otherEntry = (FlushQueueEntry) other; 910 return hashCode() - otherEntry.hashCode(); 911 } 912 913 @Override 914 public String toString() { 915 return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]"; 916 } 917 918 @Override 919 public int hashCode() { 920 int hash = (int) getDelay(TimeUnit.MILLISECONDS); 921 return hash ^ region.hashCode(); 922 } 923 924 @Override 925 public boolean equals(Object obj) { 926 if (this == obj) { 927 return true; 928 } 929 if (obj == null || getClass() != obj.getClass()) { 930 return false; 931 } 932 FlushRegionEntry other = (FlushRegionEntry) obj; 933 if (!Bytes.equals(this.region.getRegionInfo().getRegionName(), 934 other.region.getRegionInfo().getRegionName())) { 935 return false; 936 } 937 return compareTo(other) == 0; 938 } 939 } 940}