001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.lang.Thread.UncaughtExceptionHandler; 023import java.util.ArrayList; 024import java.util.ConcurrentModificationException; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.SortedMap; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.DelayQueue; 033import java.util.concurrent.Delayed; 034import java.util.concurrent.ThreadFactory; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.LongAdder; 038import java.util.concurrent.locks.ReentrantReadWriteLock; 039 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.DroppedSnapshotException; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.client.RegionReplicaUtil; 044import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 045import org.apache.hadoop.hbase.trace.TraceUtil; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.HasThread; 049import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 050import org.apache.hadoop.hbase.util.Threads; 051import org.apache.hadoop.ipc.RemoteException; 052import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 053import org.apache.htrace.core.TraceScope; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * Thread that flushes cache on request 060 * 061 * NOTE: This class extends Thread rather than Chore because the sleep time 062 * can be interrupted when there is something to do, rather than the Chore 063 * sleep time which is invariant. 064 * 065 * @see FlushRequester 066 */ 067@InterfaceAudience.Private 068class MemStoreFlusher implements FlushRequester { 069 private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class); 070 071 private Configuration conf; 072 // These two data members go together. Any entry in the one must have 073 // a corresponding entry in the other. 074 private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>(); 075 private final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>(); 076 private AtomicBoolean wakeupPending = new AtomicBoolean(); 077 078 private final long threadWakeFrequency; 079 private final HRegionServer server; 080 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 081 private final Object blockSignal = new Object(); 082 083 private long blockingWaitTime; 084 private final LongAdder updatesBlockedMsHighWater = new LongAdder(); 085 086 private final FlushHandler[] flushHandlers; 087 private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1); 088 089 private FlushType flushType; 090 091 /** 092 * Singleton instance inserted into flush queue used for signaling. 093 */ 094 private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() { 095 @Override 096 public long getDelay(TimeUnit unit) { 097 return 0; 098 } 099 100 @Override 101 public int compareTo(Delayed o) { 102 return -1; 103 } 104 105 @Override 106 public boolean equals(Object obj) { 107 return obj == this; 108 } 109 110 @Override 111 public int hashCode() { 112 return 42; 113 } 114 }; 115 116 117 /** 118 * @param conf 119 * @param server 120 */ 121 public MemStoreFlusher(final Configuration conf, 122 final HRegionServer server) { 123 super(); 124 this.conf = conf; 125 this.server = server; 126 this.threadWakeFrequency = 127 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 128 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 129 90000); 130 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); 131 this.flushHandlers = new FlushHandler[handlerCount]; 132 LOG.info("globalMemStoreLimit=" 133 + TraditionalBinaryPrefix 134 .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) 135 + ", globalMemStoreLimitLowMark=" 136 + TraditionalBinaryPrefix.long2String( 137 this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) 138 + ", Offheap=" 139 + (this.server.getRegionServerAccounting().isOffheap())); 140 } 141 142 public LongAdder getUpdatesBlockedMsHighWater() { 143 return this.updatesBlockedMsHighWater; 144 } 145 146 public void setFlushType(FlushType flushType) { 147 this.flushType = flushType; 148 } 149 150 /** 151 * The memstore across all regions has exceeded the low water mark. Pick 152 * one region to flush and flush it synchronously (this is called from the 153 * flush thread) 154 * @return true if successful 155 */ 156 private boolean flushOneForGlobalPressure() { 157 SortedMap<Long, HRegion> regionsBySize = null; 158 switch(flushType) { 159 case ABOVE_OFFHEAP_HIGHER_MARK: 160 case ABOVE_OFFHEAP_LOWER_MARK: 161 regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize(); 162 break; 163 case ABOVE_ONHEAP_HIGHER_MARK: 164 case ABOVE_ONHEAP_LOWER_MARK: 165 default: 166 regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize(); 167 } 168 Set<HRegion> excludedRegions = new HashSet<>(); 169 170 double secondaryMultiplier 171 = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf); 172 173 boolean flushedOne = false; 174 while (!flushedOne) { 175 // Find the biggest region that doesn't have too many storefiles (might be null!) 176 HRegion bestFlushableRegion = 177 getBiggestMemStoreRegion(regionsBySize, excludedRegions, true); 178 // Find the biggest region, total, even if it might have too many flushes. 179 HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false); 180 // Find the biggest region that is a secondary region 181 HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions); 182 if (bestAnyRegion == null) { 183 // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null 184 bestAnyRegion = bestRegionReplica; 185 } 186 if (bestAnyRegion == null) { 187 LOG.error("Above memory mark but there are no flushable regions!"); 188 return false; 189 } 190 191 HRegion regionToFlush; 192 long bestAnyRegionSize; 193 long bestFlushableRegionSize; 194 switch(flushType) { 195 case ABOVE_OFFHEAP_HIGHER_MARK: 196 case ABOVE_OFFHEAP_LOWER_MARK: 197 bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize(); 198 bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion); 199 break; 200 201 case ABOVE_ONHEAP_HIGHER_MARK: 202 case ABOVE_ONHEAP_LOWER_MARK: 203 bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize(); 204 bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion); 205 break; 206 207 default: 208 bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize(); 209 bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion); 210 } 211 if (bestAnyRegionSize > 2 * bestFlushableRegionSize) { 212 // Even if it's not supposed to be flushed, pick a region if it's more than twice 213 // as big as the best flushable one - otherwise when we're under pressure we make 214 // lots of little flushes and cause lots of compactions, etc, which just makes 215 // life worse! 216 if (LOG.isDebugEnabled()) { 217 LOG.debug("Under global heap pressure: " + "Region " 218 + bestAnyRegion.getRegionInfo().getRegionNameAsString() 219 + " has too many " + "store files, but is " 220 + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1) 221 + " vs best flushable region's " 222 + TraditionalBinaryPrefix.long2String( 223 bestFlushableRegionSize, "", 1) 224 + ". Choosing the bigger."); 225 } 226 regionToFlush = bestAnyRegion; 227 } else { 228 if (bestFlushableRegion == null) { 229 regionToFlush = bestAnyRegion; 230 } else { 231 regionToFlush = bestFlushableRegion; 232 } 233 } 234 235 long regionToFlushSize; 236 long bestRegionReplicaSize; 237 switch(flushType) { 238 case ABOVE_OFFHEAP_HIGHER_MARK: 239 case ABOVE_OFFHEAP_LOWER_MARK: 240 regionToFlushSize = regionToFlush.getMemStoreOffHeapSize(); 241 bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica); 242 break; 243 244 case ABOVE_ONHEAP_HIGHER_MARK: 245 case ABOVE_ONHEAP_LOWER_MARK: 246 regionToFlushSize = regionToFlush.getMemStoreHeapSize(); 247 bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica); 248 break; 249 250 default: 251 regionToFlushSize = regionToFlush.getMemStoreDataSize(); 252 bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica); 253 } 254 255 if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) { 256 // A concurrency issue (such as splitting region) may happen such that the online region 257 // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to 258 // getBiggestMemStoreRegion(). This means that we can come out of the loop 259 LOG.debug("Above memory mark but there is no flushable region"); 260 return false; 261 } 262 263 if (regionToFlush == null || 264 (bestRegionReplica != null && 265 ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) && 266 (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))) { 267 LOG.info("Refreshing storefiles of region " + bestRegionReplica + 268 " due to global heap pressure. Total memstore off heap size=" + 269 TraditionalBinaryPrefix.long2String( 270 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) + 271 " memstore heap size=" + TraditionalBinaryPrefix.long2String( 272 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)); 273 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); 274 if (!flushedOne) { 275 LOG.info("Excluding secondary region " + bestRegionReplica + 276 " - trying to find a different region to refresh files."); 277 excludedRegions.add(bestRegionReplica); 278 } 279 } else { 280 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " + 281 "Flush type=" + flushType.toString() + 282 "Total Memstore Heap size=" + 283 TraditionalBinaryPrefix.long2String( 284 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) + 285 "Total Memstore Off-Heap size=" + 286 TraditionalBinaryPrefix.long2String( 287 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) + 288 ", Region memstore size=" + 289 TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1)); 290 flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY); 291 292 if (!flushedOne) { 293 LOG.info("Excluding unflushable region " + regionToFlush + 294 " - trying to find a different region to flush."); 295 excludedRegions.add(regionToFlush); 296 } 297 } 298 } 299 return true; 300 } 301 302 /** 303 * @return Return memstore offheap size or null if <code>r</code> is null 304 */ 305 private static long getMemStoreOffHeapSize(HRegion r) { 306 return r == null? 0: r.getMemStoreOffHeapSize(); 307 } 308 309 /** 310 * @return Return memstore heap size or null if <code>r</code> is null 311 */ 312 private static long getMemStoreHeapSize(HRegion r) { 313 return r == null? 0: r.getMemStoreHeapSize(); 314 } 315 316 /** 317 * @return Return memstore data size or null if <code>r</code> is null 318 */ 319 private static long getMemStoreDataSize(HRegion r) { 320 return r == null? 0: r.getMemStoreDataSize(); 321 } 322 323 private class FlushHandler extends HasThread { 324 325 private FlushHandler(String name) { 326 super(name); 327 } 328 329 @Override 330 public void run() { 331 while (!server.isStopped()) { 332 FlushQueueEntry fqe = null; 333 try { 334 wakeupPending.set(false); // allow someone to wake us up again 335 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); 336 if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) { 337 FlushType type = isAboveLowWaterMark(); 338 if (type != FlushType.NORMAL) { 339 LOG.debug("Flush thread woke up because memory above low water=" 340 + TraditionalBinaryPrefix.long2String( 341 server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)); 342 // For offheap memstore, even if the lower water mark was breached due to heap overhead 343 // we still select the regions based on the region's memstore data size. 344 // TODO : If we want to decide based on heap over head it can be done without tracking 345 // it per region. 346 if (!flushOneForGlobalPressure()) { 347 // Wasn't able to flush any region, but we're above low water mark 348 // This is unlikely to happen, but might happen when closing the 349 // entire server - another thread is flushing regions. We'll just 350 // sleep a little bit to avoid spinning, and then pretend that 351 // we flushed one, so anyone blocked will check again 352 Thread.sleep(1000); 353 wakeUpIfBlocking(); 354 } 355 // Enqueue another one of these tokens so we'll wake up again 356 wakeupFlushThread(); 357 } 358 continue; 359 } 360 FlushRegionEntry fre = (FlushRegionEntry) fqe; 361 if (!flushRegion(fre)) { 362 break; 363 } 364 } catch (InterruptedException ex) { 365 continue; 366 } catch (ConcurrentModificationException ex) { 367 continue; 368 } catch (Exception ex) { 369 LOG.error("Cache flusher failed for entry " + fqe, ex); 370 if (!server.checkFileSystem()) { 371 break; 372 } 373 } 374 } 375 synchronized (regionsInQueue) { 376 regionsInQueue.clear(); 377 flushQueue.clear(); 378 } 379 380 // Signal anyone waiting, so they see the close flag 381 wakeUpIfBlocking(); 382 LOG.info(getName() + " exiting"); 383 } 384 } 385 386 387 private void wakeupFlushThread() { 388 if (wakeupPending.compareAndSet(false, true)) { 389 flushQueue.add(WAKEUPFLUSH_INSTANCE); 390 } 391 } 392 393 private HRegion getBiggestMemStoreRegion( 394 SortedMap<Long, HRegion> regionsBySize, 395 Set<HRegion> excludedRegions, 396 boolean checkStoreFileCount) { 397 synchronized (regionsInQueue) { 398 for (HRegion region : regionsBySize.values()) { 399 if (excludedRegions.contains(region)) { 400 continue; 401 } 402 403 if (region.writestate.flushing || !region.writestate.writesEnabled) { 404 continue; 405 } 406 407 if (checkStoreFileCount && isTooManyStoreFiles(region)) { 408 continue; 409 } 410 return region; 411 } 412 } 413 return null; 414 } 415 416 private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize, 417 Set<HRegion> excludedRegions) { 418 synchronized (regionsInQueue) { 419 for (HRegion region : regionsBySize.values()) { 420 if (excludedRegions.contains(region)) { 421 continue; 422 } 423 424 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 425 continue; 426 } 427 428 return region; 429 } 430 } 431 return null; 432 } 433 434 private boolean refreshStoreFilesAndReclaimMemory(Region region) { 435 try { 436 return region.refreshStoreFiles(); 437 } catch (IOException e) { 438 LOG.warn("Refreshing store files failed with exception", e); 439 } 440 return false; 441 } 442 443 /** 444 * Return true if global memory usage is above the high watermark 445 */ 446 private FlushType isAboveHighWaterMark() { 447 return server.getRegionServerAccounting().isAboveHighWaterMark(); 448 } 449 450 /** 451 * Return true if we're above the low watermark 452 */ 453 private FlushType isAboveLowWaterMark() { 454 return server.getRegionServerAccounting().isAboveLowWaterMark(); 455 } 456 457 @Override 458 public void requestFlush(HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) { 459 r.incrementFlushesQueuedCount(); 460 synchronized (regionsInQueue) { 461 if (!regionsInQueue.containsKey(r)) { 462 // This entry has no delay so it will be added at the top of the flush 463 // queue. It'll come out near immediately. 464 FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker); 465 this.regionsInQueue.put(r, fqe); 466 this.flushQueue.add(fqe); 467 } else { 468 tracker.notExecuted("Flush already requested on " + r); 469 } 470 } 471 } 472 473 @Override 474 public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) { 475 r.incrementFlushesQueuedCount(); 476 synchronized (regionsInQueue) { 477 if (!regionsInQueue.containsKey(r)) { 478 // This entry has some delay 479 FlushRegionEntry fqe = 480 new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY); 481 fqe.requeue(delay); 482 this.regionsInQueue.put(r, fqe); 483 this.flushQueue.add(fqe); 484 } 485 } 486 } 487 488 public int getFlushQueueSize() { 489 return flushQueue.size(); 490 } 491 492 /** 493 * Only interrupt once it's done with a run through the work loop. 494 */ 495 void interruptIfNecessary() { 496 lock.writeLock().lock(); 497 try { 498 for (FlushHandler flushHander : flushHandlers) { 499 if (flushHander != null) flushHander.interrupt(); 500 } 501 } finally { 502 lock.writeLock().unlock(); 503 } 504 } 505 506 synchronized void start(UncaughtExceptionHandler eh) { 507 ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory( 508 server.getServerName().toShortString() + "-MemStoreFlusher", eh); 509 for (int i = 0; i < flushHandlers.length; i++) { 510 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i); 511 flusherThreadFactory.newThread(flushHandlers[i]); 512 flushHandlers[i].start(); 513 } 514 } 515 516 boolean isAlive() { 517 for (FlushHandler flushHander : flushHandlers) { 518 if (flushHander != null && flushHander.isAlive()) { 519 return true; 520 } 521 } 522 return false; 523 } 524 525 void join() { 526 for (FlushHandler flushHander : flushHandlers) { 527 if (flushHander != null) { 528 Threads.shutdown(flushHander.getThread()); 529 } 530 } 531 } 532 533 /** 534 * A flushRegion that checks store file count. If too many, puts the flush 535 * on delay queue to retry later. 536 * @param fqe 537 * @return true if the region was successfully flushed, false otherwise. If 538 * false, there will be accompanying log messages explaining why the region was 539 * not flushed. 540 */ 541 private boolean flushRegion(final FlushRegionEntry fqe) { 542 HRegion region = fqe.region; 543 if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { 544 if (fqe.isMaximumWait(this.blockingWaitTime)) { 545 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) + 546 "ms on a compaction to clean up 'too many store files'; waited " + 547 "long enough... proceeding with flush of " + 548 region.getRegionInfo().getRegionNameAsString()); 549 } else { 550 // If this is first time we've been put off, then emit a log message. 551 if (fqe.getRequeueCount() <= 0) { 552 // Note: We don't impose blockingStoreFiles constraint on meta regions 553 LOG.warn("{} has too many store files({}); delaying flush up to {} ms", 554 region.getRegionInfo().getEncodedName(), getStoreFileCount(region), 555 this.blockingWaitTime); 556 if (!this.server.compactSplitThread.requestSplit(region)) { 557 try { 558 this.server.compactSplitThread.requestSystemCompaction(region, 559 Thread.currentThread().getName()); 560 } catch (IOException e) { 561 e = e instanceof RemoteException ? 562 ((RemoteException)e).unwrapRemoteException() : e; 563 LOG.error("Cache flush failed for region " + 564 Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e); 565 } 566 } 567 } 568 569 // Put back on the queue. Have it come back out of the queue 570 // after a delay of this.blockingWaitTime / 100 ms. 571 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); 572 // Tell a lie, it's not flushed but it's ok 573 return true; 574 } 575 } 576 return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker()); 577 } 578 579 /** 580 * Flush a region. 581 * @param region Region to flush. 582 * @param emergencyFlush Set if we are being force flushed. If true the region 583 * needs to be removed from the flush queue. If false, when we were called 584 * from the main flusher run loop and we got the entry to flush by calling 585 * poll on the flush queue (which removed it). 586 * @param forceFlushAllStores whether we want to flush all store. 587 * @return true if the region was successfully flushed, false otherwise. If 588 * false, there will be accompanying log messages explaining why the region was 589 * not flushed. 590 */ 591 private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores, 592 FlushLifeCycleTracker tracker) { 593 synchronized (this.regionsInQueue) { 594 FlushRegionEntry fqe = this.regionsInQueue.remove(region); 595 // Use the start time of the FlushRegionEntry if available 596 if (fqe != null && emergencyFlush) { 597 // Need to remove from region from delay queue. When NOT an 598 // emergencyFlush, then item was removed via a flushQueue.poll. 599 flushQueue.remove(fqe); 600 } 601 } 602 603 tracker.beforeExecution(); 604 lock.readLock().lock(); 605 try { 606 notifyFlushRequest(region, emergencyFlush); 607 FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker); 608 boolean shouldCompact = flushResult.isCompactionNeeded(); 609 // We just want to check the size 610 boolean shouldSplit = region.checkSplit() != null; 611 if (shouldSplit) { 612 this.server.compactSplitThread.requestSplit(region); 613 } else if (shouldCompact) { 614 server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); 615 } 616 } catch (DroppedSnapshotException ex) { 617 // Cache flush can fail in a few places. If it fails in a critical 618 // section, we get a DroppedSnapshotException and a replay of wal 619 // is required. Currently the only way to do this is a restart of 620 // the server. Abort because hdfs is probably bad (HBASE-644 is a case 621 // where hdfs was bad but passed the hdfs check). 622 server.abort("Replay of WAL required. Forcing server shutdown", ex); 623 return false; 624 } catch (IOException ex) { 625 ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex; 626 LOG.error( 627 "Cache flush failed" 628 + (region != null ? (" for region " + 629 Bytes.toStringBinary(region.getRegionInfo().getRegionName())) 630 : ""), ex); 631 if (!server.checkFileSystem()) { 632 return false; 633 } 634 } finally { 635 lock.readLock().unlock(); 636 wakeUpIfBlocking(); 637 tracker.afterExecution(); 638 } 639 return true; 640 } 641 642 private void notifyFlushRequest(Region region, boolean emergencyFlush) { 643 FlushType type = null; 644 if (emergencyFlush) { 645 type = isAboveHighWaterMark(); 646 if (type == null) { 647 type = isAboveLowWaterMark(); 648 } 649 } 650 for (FlushRequestListener listener : flushRequestListeners) { 651 listener.flushRequested(type, region); 652 } 653 } 654 655 private void wakeUpIfBlocking() { 656 synchronized (blockSignal) { 657 blockSignal.notifyAll(); 658 } 659 } 660 661 private boolean isTooManyStoreFiles(Region region) { 662 663 // When compaction is disabled, the region is flushable 664 if (!region.getTableDescriptor().isCompactionEnabled()) { 665 return false; 666 } 667 668 for (Store store : region.getStores()) { 669 if (store.hasTooManyStoreFiles()) { 670 return true; 671 } 672 } 673 return false; 674 } 675 676 private int getStoreFileCount(Region region) { 677 int count = 0; 678 for (Store store : region.getStores()) { 679 count += store.getStorefilesCount(); 680 } 681 return count; 682 } 683 684 /** 685 * Check if the regionserver's memstore memory usage is greater than the 686 * limit. If so, flush regions with the biggest memstores until we're down 687 * to the lower limit. This method blocks callers until we're down to a safe 688 * amount of memstore consumption. 689 */ 690 public void reclaimMemStoreMemory() { 691 try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) { 692 FlushType flushType = isAboveHighWaterMark(); 693 if (flushType != FlushType.NORMAL) { 694 TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark."); 695 long start = EnvironmentEdgeManager.currentTime(); 696 long nextLogTimeMs = start; 697 synchronized (this.blockSignal) { 698 boolean blocked = false; 699 long startTime = 0; 700 boolean interrupted = false; 701 try { 702 flushType = isAboveHighWaterMark(); 703 while (flushType != FlushType.NORMAL && !server.isStopped()) { 704 server.cacheFlusher.setFlushType(flushType); 705 if (!blocked) { 706 startTime = EnvironmentEdgeManager.currentTime(); 707 if (!server.getRegionServerAccounting().isOffheap()) { 708 logMsg("global memstore heapsize", 709 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 710 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 711 } else { 712 switch (flushType) { 713 case ABOVE_OFFHEAP_HIGHER_MARK: 714 logMsg("the global offheap memstore datasize", 715 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), 716 server.getRegionServerAccounting().getGlobalMemStoreLimit()); 717 break; 718 case ABOVE_ONHEAP_HIGHER_MARK: 719 logMsg("global memstore heapsize", 720 server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), 721 server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit()); 722 break; 723 default: 724 break; 725 } 726 } 727 } 728 blocked = true; 729 wakeupFlushThread(); 730 try { 731 // we should be able to wait forever, but we've seen a bug where 732 // we miss a notify, so put a 5 second bound on it at least. 733 blockSignal.wait(5 * 1000); 734 } catch (InterruptedException ie) { 735 LOG.warn("Interrupted while waiting"); 736 interrupted = true; 737 } 738 long nowMs = EnvironmentEdgeManager.currentTime(); 739 if (nowMs >= nextLogTimeMs) { 740 LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start); 741 nextLogTimeMs = nowMs + 1000; 742 } 743 flushType = isAboveHighWaterMark(); 744 } 745 } finally { 746 if (interrupted) { 747 Thread.currentThread().interrupt(); 748 } 749 } 750 751 if(blocked){ 752 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime; 753 if(totalTime > 0){ 754 this.updatesBlockedMsHighWater.add(totalTime); 755 } 756 LOG.info("Unblocking updates for server " + server.toString()); 757 } 758 } 759 } else { 760 flushType = isAboveLowWaterMark(); 761 if (flushType != FlushType.NORMAL) { 762 server.cacheFlusher.setFlushType(flushType); 763 wakeupFlushThread(); 764 } 765 } 766 } 767 } 768 769 private void logMsg(String type, long val, long max) { 770 LOG.info("Blocking updates: {} {} is >= blocking {}", type, 771 TraditionalBinaryPrefix.long2String(val, "", 1), 772 TraditionalBinaryPrefix.long2String(max, "", 1)); 773 } 774 775 @Override 776 public String toString() { 777 return "flush_queue=" 778 + flushQueue.size(); 779 } 780 781 public String dumpQueue() { 782 StringBuilder queueList = new StringBuilder(); 783 queueList.append("Flush Queue Queue dump:\n"); 784 queueList.append(" Flush Queue:\n"); 785 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator(); 786 787 while(it.hasNext()){ 788 queueList.append(" "+it.next().toString()); 789 queueList.append("\n"); 790 } 791 792 return queueList.toString(); 793 } 794 795 /** 796 * Register a MemstoreFlushListener 797 * @param listener 798 */ 799 @Override 800 public void registerFlushRequestListener(final FlushRequestListener listener) { 801 this.flushRequestListeners.add(listener); 802 } 803 804 /** 805 * Unregister the listener from MemstoreFlushListeners 806 * @param listener 807 * @return true when passed listener is unregistered successfully. 808 */ 809 @Override 810 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) { 811 return this.flushRequestListeners.remove(listener); 812 } 813 814 /** 815 * Sets the global memstore limit to a new size. 816 * @param globalMemStoreSize 817 */ 818 @Override 819 public void setGlobalMemStoreLimit(long globalMemStoreSize) { 820 this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize); 821 reclaimMemStoreMemory(); 822 } 823 824 interface FlushQueueEntry extends Delayed { 825 } 826 827 /** 828 * Datastructure used in the flush queue. Holds region and retry count. 829 * Keeps tabs on how old this object is. Implements {@link Delayed}. On 830 * construction, the delay is zero. When added to a delay queue, we'll come 831 * out near immediately. Call {@link #requeue(long)} passing delay in 832 * milliseconds before readding to delay queue if you want it to stay there 833 * a while. 834 */ 835 static class FlushRegionEntry implements FlushQueueEntry { 836 private final HRegion region; 837 838 private final long createTime; 839 private long whenToExpire; 840 private int requeueCount = 0; 841 842 private final boolean forceFlushAllStores; 843 844 private final FlushLifeCycleTracker tracker; 845 846 FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) { 847 this.region = r; 848 this.createTime = EnvironmentEdgeManager.currentTime(); 849 this.whenToExpire = this.createTime; 850 this.forceFlushAllStores = forceFlushAllStores; 851 this.tracker = tracker; 852 } 853 854 /** 855 * @param maximumWait 856 * @return True if we have been delayed > <code>maximumWait</code> milliseconds. 857 */ 858 public boolean isMaximumWait(final long maximumWait) { 859 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; 860 } 861 862 /** 863 * @return Count of times {@link #requeue(long)} was called; i.e this is 864 * number of times we've been requeued. 865 */ 866 public int getRequeueCount() { 867 return this.requeueCount; 868 } 869 870 /** 871 * @return whether we need to flush all stores. 872 */ 873 public boolean isForceFlushAllStores() { 874 return forceFlushAllStores; 875 } 876 877 public FlushLifeCycleTracker getTracker() { 878 return tracker; 879 } 880 881 /** 882 * @param when When to expire, when to come up out of the queue. 883 * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime() 884 * to whatever you pass. 885 * @return This. 886 */ 887 public FlushRegionEntry requeue(final long when) { 888 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when; 889 this.requeueCount++; 890 return this; 891 } 892 893 @Override 894 public long getDelay(TimeUnit unit) { 895 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), 896 TimeUnit.MILLISECONDS); 897 } 898 899 @Override 900 public int compareTo(Delayed other) { 901 // Delay is compared first. If there is a tie, compare region's hash code 902 int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - 903 other.getDelay(TimeUnit.MILLISECONDS)).intValue(); 904 if (ret != 0) { 905 return ret; 906 } 907 FlushQueueEntry otherEntry = (FlushQueueEntry) other; 908 return hashCode() - otherEntry.hashCode(); 909 } 910 911 @Override 912 public String toString() { 913 return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]"; 914 } 915 916 @Override 917 public int hashCode() { 918 int hash = (int) getDelay(TimeUnit.MILLISECONDS); 919 return hash ^ region.hashCode(); 920 } 921 922 @Override 923 public boolean equals(Object obj) { 924 if (this == obj) { 925 return true; 926 } 927 if (obj == null || getClass() != obj.getClass()) { 928 return false; 929 } 930 FlushRegionEntry other = (FlushRegionEntry) obj; 931 if (!Bytes.equals(this.region.getRegionInfo().getRegionName(), 932 other.region.getRegionInfo().getRegionName())) { 933 return false; 934 } 935 return compareTo(other) == 0; 936 } 937 } 938}