001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.Thread.UncaughtExceptionHandler;
023import java.util.ArrayList;
024import java.util.ConcurrentModificationException;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.SortedMap;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.DelayQueue;
033import java.util.concurrent.Delayed;
034import java.util.concurrent.ThreadFactory;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.LongAdder;
038import java.util.concurrent.locks.ReentrantReadWriteLock;
039
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.DroppedSnapshotException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.client.RegionReplicaUtil;
044import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
045import org.apache.hadoop.hbase.trace.TraceUtil;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.HasThread;
049import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.hadoop.ipc.RemoteException;
052import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
053import org.apache.htrace.core.TraceScope;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * Thread that flushes cache on request
060 *
061 * NOTE: This class extends Thread rather than Chore because the sleep time
062 * can be interrupted when there is something to do, rather than the Chore
063 * sleep time which is invariant.
064 *
065 * @see FlushRequester
066 */
067@InterfaceAudience.Private
068class MemStoreFlusher implements FlushRequester {
069  private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
070
071  private Configuration conf;
072  // These two data members go together.  Any entry in the one must have
073  // a corresponding entry in the other.
074  private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>();
075  private final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>();
076  private AtomicBoolean wakeupPending = new AtomicBoolean();
077
078  private final long threadWakeFrequency;
079  private final HRegionServer server;
080  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
081  private final Object blockSignal = new Object();
082
083  private long blockingWaitTime;
084  private final LongAdder updatesBlockedMsHighWater = new LongAdder();
085
086  private final FlushHandler[] flushHandlers;
087  private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
088
089  private FlushType flushType;
090
091  /**
092   * Singleton instance inserted into flush queue used for signaling.
093   */
094  private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() {
095    @Override
096    public long getDelay(TimeUnit unit) {
097      return 0;
098    }
099
100    @Override
101    public int compareTo(Delayed o) {
102      return -1;
103    }
104
105    @Override
106    public boolean equals(Object obj) {
107      return obj == this;
108    }
109
110    @Override
111    public int hashCode() {
112      return 42;
113    }
114  };
115
116
117  /**
118   * @param conf
119   * @param server
120   */
121  public MemStoreFlusher(final Configuration conf,
122      final HRegionServer server) {
123    super();
124    this.conf = conf;
125    this.server = server;
126    this.threadWakeFrequency =
127        conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
128    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
129      90000);
130    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
131    this.flushHandlers = new FlushHandler[handlerCount];
132    LOG.info("globalMemStoreLimit="
133        + TraditionalBinaryPrefix
134            .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
135        + ", globalMemStoreLimitLowMark="
136        + TraditionalBinaryPrefix.long2String(
137          this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)
138        + ", Offheap="
139        + (this.server.getRegionServerAccounting().isOffheap()));
140  }
141
142  public LongAdder getUpdatesBlockedMsHighWater() {
143    return this.updatesBlockedMsHighWater;
144  }
145
146  public void setFlushType(FlushType flushType) {
147    this.flushType = flushType;
148  }
149
150  /**
151   * The memstore across all regions has exceeded the low water mark. Pick
152   * one region to flush and flush it synchronously (this is called from the
153   * flush thread)
154   * @return true if successful
155   */
156  private boolean flushOneForGlobalPressure() {
157    SortedMap<Long, HRegion> regionsBySize = null;
158    switch(flushType) {
159      case ABOVE_OFFHEAP_HIGHER_MARK:
160      case ABOVE_OFFHEAP_LOWER_MARK:
161        regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize();
162        break;
163      case ABOVE_ONHEAP_HIGHER_MARK:
164      case ABOVE_ONHEAP_LOWER_MARK:
165      default:
166        regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize();
167    }
168    Set<HRegion> excludedRegions = new HashSet<>();
169
170    double secondaryMultiplier
171      = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
172
173    boolean flushedOne = false;
174    while (!flushedOne) {
175      // Find the biggest region that doesn't have too many storefiles (might be null!)
176      HRegion bestFlushableRegion =
177          getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
178      // Find the biggest region, total, even if it might have too many flushes.
179      HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false);
180      // Find the biggest region that is a secondary region
181      HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions);
182      if (bestAnyRegion == null) {
183        // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null
184        bestAnyRegion = bestRegionReplica;
185      }
186      if (bestAnyRegion == null) {
187        LOG.error("Above memory mark but there are no flushable regions!");
188        return false;
189      }
190
191      HRegion regionToFlush;
192      long bestAnyRegionSize;
193      long bestFlushableRegionSize;
194      switch(flushType) {
195        case ABOVE_OFFHEAP_HIGHER_MARK:
196        case ABOVE_OFFHEAP_LOWER_MARK:
197          bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
198          bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion);
199          break;
200
201        case ABOVE_ONHEAP_HIGHER_MARK:
202        case ABOVE_ONHEAP_LOWER_MARK:
203          bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
204          bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion);
205          break;
206
207        default:
208          bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
209          bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion);
210      }
211      if (bestAnyRegionSize > 2 * bestFlushableRegionSize) {
212        // Even if it's not supposed to be flushed, pick a region if it's more than twice
213        // as big as the best flushable one - otherwise when we're under pressure we make
214        // lots of little flushes and cause lots of compactions, etc, which just makes
215        // life worse!
216        if (LOG.isDebugEnabled()) {
217          LOG.debug("Under global heap pressure: " + "Region "
218              + bestAnyRegion.getRegionInfo().getRegionNameAsString()
219              + " has too many " + "store files, but is "
220              + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1)
221              + " vs best flushable region's "
222              + TraditionalBinaryPrefix.long2String(
223              bestFlushableRegionSize, "", 1)
224              + ". Choosing the bigger.");
225        }
226        regionToFlush = bestAnyRegion;
227      } else {
228        if (bestFlushableRegion == null) {
229          regionToFlush = bestAnyRegion;
230        } else {
231          regionToFlush = bestFlushableRegion;
232        }
233      }
234
235      long regionToFlushSize;
236      long bestRegionReplicaSize;
237      switch(flushType) {
238        case ABOVE_OFFHEAP_HIGHER_MARK:
239        case ABOVE_OFFHEAP_LOWER_MARK:
240          regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
241          bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica);
242          break;
243
244        case ABOVE_ONHEAP_HIGHER_MARK:
245        case ABOVE_ONHEAP_LOWER_MARK:
246          regionToFlushSize = regionToFlush.getMemStoreHeapSize();
247          bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica);
248          break;
249
250        default:
251          regionToFlushSize = regionToFlush.getMemStoreDataSize();
252          bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica);
253      }
254
255      if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) {
256        // A concurrency issue (such as splitting region) may happen such that the online region
257        // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to
258        // getBiggestMemStoreRegion(). This means that we can come out of the loop
259        LOG.debug("Above memory mark but there is no flushable region");
260        return false;
261      }
262
263      if (regionToFlush == null ||
264          (bestRegionReplica != null &&
265           ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
266           (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))) {
267        LOG.info("Refreshing storefiles of region " + bestRegionReplica +
268            " due to global heap pressure. Total memstore off heap size=" +
269            TraditionalBinaryPrefix.long2String(
270              server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
271            " memstore heap size=" + TraditionalBinaryPrefix.long2String(
272              server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
273        flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
274        if (!flushedOne) {
275          LOG.info("Excluding secondary region " + bestRegionReplica +
276              " - trying to find a different region to refresh files.");
277          excludedRegions.add(bestRegionReplica);
278        }
279      } else {
280        LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " +
281            "Flush type=" + flushType.toString() +
282            "Total Memstore Heap size=" +
283            TraditionalBinaryPrefix.long2String(
284                server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) +
285            "Total Memstore Off-Heap size=" +
286            TraditionalBinaryPrefix.long2String(
287                server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
288            ", Region memstore size=" +
289            TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
290        flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY);
291
292        if (!flushedOne) {
293          LOG.info("Excluding unflushable region " + regionToFlush +
294              " - trying to find a different region to flush.");
295          excludedRegions.add(regionToFlush);
296        }
297      }
298    }
299    return true;
300  }
301
302  /**
303   * @return Return memstore offheap size or null if <code>r</code> is null
304   */
305  private static long getMemStoreOffHeapSize(HRegion r) {
306    return r == null? 0: r.getMemStoreOffHeapSize();
307  }
308
309  /**
310   * @return Return memstore heap size or null if <code>r</code> is null
311   */
312  private static long getMemStoreHeapSize(HRegion r) {
313    return r == null? 0: r.getMemStoreHeapSize();
314  }
315
316  /**
317   * @return Return memstore data size or null if <code>r</code> is null
318   */
319  private static long getMemStoreDataSize(HRegion r) {
320    return r == null? 0: r.getMemStoreDataSize();
321  }
322
323  private class FlushHandler extends HasThread {
324
325    private FlushHandler(String name) {
326      super(name);
327    }
328
329    @Override
330    public void run() {
331      while (!server.isStopped()) {
332        FlushQueueEntry fqe = null;
333        try {
334          wakeupPending.set(false); // allow someone to wake us up again
335          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
336          if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
337            FlushType type = isAboveLowWaterMark();
338            if (type != FlushType.NORMAL) {
339              LOG.debug("Flush thread woke up because memory above low water="
340                  + TraditionalBinaryPrefix.long2String(
341                    server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1));
342              // For offheap memstore, even if the lower water mark was breached due to heap overhead
343              // we still select the regions based on the region's memstore data size.
344              // TODO : If we want to decide based on heap over head it can be done without tracking
345              // it per region.
346              if (!flushOneForGlobalPressure()) {
347                // Wasn't able to flush any region, but we're above low water mark
348                // This is unlikely to happen, but might happen when closing the
349                // entire server - another thread is flushing regions. We'll just
350                // sleep a little bit to avoid spinning, and then pretend that
351                // we flushed one, so anyone blocked will check again
352                Thread.sleep(1000);
353                wakeUpIfBlocking();
354              }
355              // Enqueue another one of these tokens so we'll wake up again
356              wakeupFlushThread();
357            }
358            continue;
359          }
360          FlushRegionEntry fre = (FlushRegionEntry) fqe;
361          if (!flushRegion(fre)) {
362            break;
363          }
364        } catch (InterruptedException ex) {
365          continue;
366        } catch (ConcurrentModificationException ex) {
367          continue;
368        } catch (Exception ex) {
369          LOG.error("Cache flusher failed for entry " + fqe, ex);
370          if (!server.checkFileSystem()) {
371            break;
372          }
373        }
374      }
375      synchronized (regionsInQueue) {
376        regionsInQueue.clear();
377        flushQueue.clear();
378      }
379
380      // Signal anyone waiting, so they see the close flag
381      wakeUpIfBlocking();
382      LOG.info(getName() + " exiting");
383    }
384  }
385
386
387  private void wakeupFlushThread() {
388    if (wakeupPending.compareAndSet(false, true)) {
389      flushQueue.add(WAKEUPFLUSH_INSTANCE);
390    }
391  }
392
393  private HRegion getBiggestMemStoreRegion(
394      SortedMap<Long, HRegion> regionsBySize,
395      Set<HRegion> excludedRegions,
396      boolean checkStoreFileCount) {
397    synchronized (regionsInQueue) {
398      for (HRegion region : regionsBySize.values()) {
399        if (excludedRegions.contains(region)) {
400          continue;
401        }
402
403        if (region.writestate.flushing || !region.writestate.writesEnabled) {
404          continue;
405        }
406
407        if (checkStoreFileCount && isTooManyStoreFiles(region)) {
408          continue;
409        }
410        return region;
411      }
412    }
413    return null;
414  }
415
416  private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize,
417      Set<HRegion> excludedRegions) {
418    synchronized (regionsInQueue) {
419      for (HRegion region : regionsBySize.values()) {
420        if (excludedRegions.contains(region)) {
421          continue;
422        }
423
424        if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
425          continue;
426        }
427
428        return region;
429      }
430    }
431    return null;
432  }
433
434  private boolean refreshStoreFilesAndReclaimMemory(Region region) {
435    try {
436      return region.refreshStoreFiles();
437    } catch (IOException e) {
438      LOG.warn("Refreshing store files failed with exception", e);
439    }
440    return false;
441  }
442
443  /**
444   * Return true if global memory usage is above the high watermark
445   */
446  private FlushType isAboveHighWaterMark() {
447    return server.getRegionServerAccounting().isAboveHighWaterMark();
448  }
449
450  /**
451   * Return true if we're above the low watermark
452   */
453  private FlushType isAboveLowWaterMark() {
454    return server.getRegionServerAccounting().isAboveLowWaterMark();
455  }
456
457  @Override
458  public void requestFlush(HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
459    r.incrementFlushesQueuedCount();
460    synchronized (regionsInQueue) {
461      if (!regionsInQueue.containsKey(r)) {
462        // This entry has no delay so it will be added at the top of the flush
463        // queue. It'll come out near immediately.
464        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
465        this.regionsInQueue.put(r, fqe);
466        this.flushQueue.add(fqe);
467      } else {
468        tracker.notExecuted("Flush already requested on " + r);
469      }
470    }
471  }
472
473  @Override
474  public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
475    r.incrementFlushesQueuedCount();
476    synchronized (regionsInQueue) {
477      if (!regionsInQueue.containsKey(r)) {
478        // This entry has some delay
479        FlushRegionEntry fqe =
480            new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY);
481        fqe.requeue(delay);
482        this.regionsInQueue.put(r, fqe);
483        this.flushQueue.add(fqe);
484      }
485    }
486  }
487
488  public int getFlushQueueSize() {
489    return flushQueue.size();
490  }
491
492  /**
493   * Only interrupt once it's done with a run through the work loop.
494   */
495  void interruptIfNecessary() {
496    lock.writeLock().lock();
497    try {
498      for (FlushHandler flushHander : flushHandlers) {
499        if (flushHander != null) flushHander.interrupt();
500      }
501    } finally {
502      lock.writeLock().unlock();
503    }
504  }
505
506  synchronized void start(UncaughtExceptionHandler eh) {
507    ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
508        server.getServerName().toShortString() + "-MemStoreFlusher", eh);
509    for (int i = 0; i < flushHandlers.length; i++) {
510      flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
511      flusherThreadFactory.newThread(flushHandlers[i]);
512      flushHandlers[i].start();
513    }
514  }
515
516  boolean isAlive() {
517    for (FlushHandler flushHander : flushHandlers) {
518      if (flushHander != null && flushHander.isAlive()) {
519        return true;
520      }
521    }
522    return false;
523  }
524
525  void join() {
526    for (FlushHandler flushHander : flushHandlers) {
527      if (flushHander != null) {
528        Threads.shutdown(flushHander.getThread());
529      }
530    }
531  }
532
533  /**
534   * A flushRegion that checks store file count.  If too many, puts the flush
535   * on delay queue to retry later.
536   * @param fqe
537   * @return true if the region was successfully flushed, false otherwise. If
538   * false, there will be accompanying log messages explaining why the region was
539   * not flushed.
540   */
541  private boolean flushRegion(final FlushRegionEntry fqe) {
542    HRegion region = fqe.region;
543    if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) {
544      if (fqe.isMaximumWait(this.blockingWaitTime)) {
545        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
546          "ms on a compaction to clean up 'too many store files'; waited " +
547          "long enough... proceeding with flush of " +
548          region.getRegionInfo().getRegionNameAsString());
549      } else {
550        // If this is first time we've been put off, then emit a log message.
551        if (fqe.getRequeueCount() <= 0) {
552          // Note: We don't impose blockingStoreFiles constraint on meta regions
553          LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
554              region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
555              this.blockingWaitTime);
556          if (!this.server.compactSplitThread.requestSplit(region)) {
557            try {
558              this.server.compactSplitThread.requestSystemCompaction(region,
559                Thread.currentThread().getName());
560            } catch (IOException e) {
561              e = e instanceof RemoteException ?
562                      ((RemoteException)e).unwrapRemoteException() : e;
563              LOG.error("Cache flush failed for region " +
564                Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e);
565            }
566          }
567        }
568
569        // Put back on the queue.  Have it come back out of the queue
570        // after a delay of this.blockingWaitTime / 100 ms.
571        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
572        // Tell a lie, it's not flushed but it's ok
573        return true;
574      }
575    }
576    return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker());
577  }
578
579  /**
580   * Flush a region.
581   * @param region Region to flush.
582   * @param emergencyFlush Set if we are being force flushed. If true the region
583   * needs to be removed from the flush queue. If false, when we were called
584   * from the main flusher run loop and we got the entry to flush by calling
585   * poll on the flush queue (which removed it).
586   * @param forceFlushAllStores whether we want to flush all store.
587   * @return true if the region was successfully flushed, false otherwise. If
588   * false, there will be accompanying log messages explaining why the region was
589   * not flushed.
590   */
591  private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores,
592      FlushLifeCycleTracker tracker) {
593    synchronized (this.regionsInQueue) {
594      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
595      // Use the start time of the FlushRegionEntry if available
596      if (fqe != null && emergencyFlush) {
597        // Need to remove from region from delay queue. When NOT an
598        // emergencyFlush, then item was removed via a flushQueue.poll.
599        flushQueue.remove(fqe);
600      }
601    }
602
603    tracker.beforeExecution();
604    lock.readLock().lock();
605    try {
606      notifyFlushRequest(region, emergencyFlush);
607      FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker);
608      boolean shouldCompact = flushResult.isCompactionNeeded();
609      // We just want to check the size
610      boolean shouldSplit = region.checkSplit() != null;
611      if (shouldSplit) {
612        this.server.compactSplitThread.requestSplit(region);
613      } else if (shouldCompact) {
614        server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
615      }
616    } catch (DroppedSnapshotException ex) {
617      // Cache flush can fail in a few places. If it fails in a critical
618      // section, we get a DroppedSnapshotException and a replay of wal
619      // is required. Currently the only way to do this is a restart of
620      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
621      // where hdfs was bad but passed the hdfs check).
622      server.abort("Replay of WAL required. Forcing server shutdown", ex);
623      return false;
624    } catch (IOException ex) {
625      ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
626      LOG.error(
627        "Cache flush failed"
628            + (region != null ? (" for region " +
629                Bytes.toStringBinary(region.getRegionInfo().getRegionName()))
630              : ""), ex);
631      if (!server.checkFileSystem()) {
632        return false;
633      }
634    } finally {
635      lock.readLock().unlock();
636      wakeUpIfBlocking();
637      tracker.afterExecution();
638    }
639    return true;
640  }
641
642  private void notifyFlushRequest(Region region, boolean emergencyFlush) {
643    FlushType type = null;
644    if (emergencyFlush) {
645      type = isAboveHighWaterMark();
646      if (type == null) {
647        type = isAboveLowWaterMark();
648      }
649    }
650    for (FlushRequestListener listener : flushRequestListeners) {
651      listener.flushRequested(type, region);
652    }
653  }
654
655  private void wakeUpIfBlocking() {
656    synchronized (blockSignal) {
657      blockSignal.notifyAll();
658    }
659  }
660
661  private boolean isTooManyStoreFiles(Region region) {
662
663    // When compaction is disabled, the region is flushable
664    if (!region.getTableDescriptor().isCompactionEnabled()) {
665      return false;
666    }
667
668    for (Store store : region.getStores()) {
669      if (store.hasTooManyStoreFiles()) {
670        return true;
671      }
672    }
673    return false;
674  }
675
676  private int getStoreFileCount(Region region) {
677    int count = 0;
678    for (Store store : region.getStores()) {
679      count += store.getStorefilesCount();
680    }
681    return count;
682  }
683
684  /**
685   * Check if the regionserver's memstore memory usage is greater than the
686   * limit. If so, flush regions with the biggest memstores until we're down
687   * to the lower limit. This method blocks callers until we're down to a safe
688   * amount of memstore consumption.
689   */
690  public void reclaimMemStoreMemory() {
691    try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) {
692      FlushType flushType = isAboveHighWaterMark();
693      if (flushType != FlushType.NORMAL) {
694        TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
695        long start = EnvironmentEdgeManager.currentTime();
696        long nextLogTimeMs = start;
697        synchronized (this.blockSignal) {
698          boolean blocked = false;
699          long startTime = 0;
700          boolean interrupted = false;
701          try {
702            flushType = isAboveHighWaterMark();
703            while (flushType != FlushType.NORMAL && !server.isStopped()) {
704              server.cacheFlusher.setFlushType(flushType);
705              if (!blocked) {
706                startTime = EnvironmentEdgeManager.currentTime();
707                if (!server.getRegionServerAccounting().isOffheap()) {
708                  logMsg("global memstore heapsize",
709                      server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
710                      server.getRegionServerAccounting().getGlobalMemStoreLimit());
711                } else {
712                  switch (flushType) {
713                    case ABOVE_OFFHEAP_HIGHER_MARK:
714                      logMsg("the global offheap memstore datasize",
715                          server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
716                          server.getRegionServerAccounting().getGlobalMemStoreLimit());
717                      break;
718                    case ABOVE_ONHEAP_HIGHER_MARK:
719                      logMsg("global memstore heapsize",
720                          server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
721                          server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
722                      break;
723                    default:
724                      break;
725                  }
726                }
727              }
728              blocked = true;
729              wakeupFlushThread();
730              try {
731                // we should be able to wait forever, but we've seen a bug where
732                // we miss a notify, so put a 5 second bound on it at least.
733                blockSignal.wait(5 * 1000);
734              } catch (InterruptedException ie) {
735                LOG.warn("Interrupted while waiting");
736                interrupted = true;
737              }
738              long nowMs = EnvironmentEdgeManager.currentTime();
739              if (nowMs >= nextLogTimeMs) {
740                LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
741                nextLogTimeMs = nowMs + 1000;
742              }
743              flushType = isAboveHighWaterMark();
744            }
745          } finally {
746            if (interrupted) {
747              Thread.currentThread().interrupt();
748            }
749          }
750
751          if(blocked){
752            final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
753            if(totalTime > 0){
754              this.updatesBlockedMsHighWater.add(totalTime);
755            }
756            LOG.info("Unblocking updates for server " + server.toString());
757          }
758        }
759      } else {
760        flushType = isAboveLowWaterMark();
761        if (flushType != FlushType.NORMAL) {
762          server.cacheFlusher.setFlushType(flushType);
763          wakeupFlushThread();
764        }
765      }
766    }
767  }
768
769  private void logMsg(String type, long val, long max) {
770    LOG.info("Blocking updates: {} {} is >= blocking {}", type,
771        TraditionalBinaryPrefix.long2String(val, "", 1),
772        TraditionalBinaryPrefix.long2String(max, "", 1));
773  }
774
775  @Override
776  public String toString() {
777    return "flush_queue="
778        + flushQueue.size();
779  }
780
781  public String dumpQueue() {
782    StringBuilder queueList = new StringBuilder();
783    queueList.append("Flush Queue Queue dump:\n");
784    queueList.append("  Flush Queue:\n");
785    java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
786
787    while(it.hasNext()){
788      queueList.append("    "+it.next().toString());
789      queueList.append("\n");
790    }
791
792    return queueList.toString();
793  }
794
795  /**
796   * Register a MemstoreFlushListener
797   * @param listener
798   */
799  @Override
800  public void registerFlushRequestListener(final FlushRequestListener listener) {
801    this.flushRequestListeners.add(listener);
802  }
803
804  /**
805   * Unregister the listener from MemstoreFlushListeners
806   * @param listener
807   * @return true when passed listener is unregistered successfully.
808   */
809  @Override
810  public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
811    return this.flushRequestListeners.remove(listener);
812  }
813
814  /**
815   * Sets the global memstore limit to a new size.
816   * @param globalMemStoreSize
817   */
818  @Override
819  public void setGlobalMemStoreLimit(long globalMemStoreSize) {
820    this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize);
821    reclaimMemStoreMemory();
822  }
823
824  interface FlushQueueEntry extends Delayed {
825  }
826
827  /**
828   * Datastructure used in the flush queue.  Holds region and retry count.
829   * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
830   * construction, the delay is zero. When added to a delay queue, we'll come
831   * out near immediately.  Call {@link #requeue(long)} passing delay in
832   * milliseconds before readding to delay queue if you want it to stay there
833   * a while.
834   */
835  static class FlushRegionEntry implements FlushQueueEntry {
836    private final HRegion region;
837
838    private final long createTime;
839    private long whenToExpire;
840    private int requeueCount = 0;
841
842    private final boolean forceFlushAllStores;
843
844    private final FlushLifeCycleTracker tracker;
845
846    FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
847      this.region = r;
848      this.createTime = EnvironmentEdgeManager.currentTime();
849      this.whenToExpire = this.createTime;
850      this.forceFlushAllStores = forceFlushAllStores;
851      this.tracker = tracker;
852    }
853
854    /**
855     * @param maximumWait
856     * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
857     */
858    public boolean isMaximumWait(final long maximumWait) {
859      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
860    }
861
862    /**
863     * @return Count of times {@link #requeue(long)} was called; i.e this is
864     * number of times we've been requeued.
865     */
866    public int getRequeueCount() {
867      return this.requeueCount;
868    }
869
870    /**
871     * @return whether we need to flush all stores.
872     */
873    public boolean isForceFlushAllStores() {
874      return forceFlushAllStores;
875    }
876
877    public FlushLifeCycleTracker getTracker() {
878      return tracker;
879    }
880
881    /**
882     * @param when When to expire, when to come up out of the queue.
883     * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
884     * to whatever you pass.
885     * @return This.
886     */
887    public FlushRegionEntry requeue(final long when) {
888      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
889      this.requeueCount++;
890      return this;
891    }
892
893    @Override
894    public long getDelay(TimeUnit unit) {
895      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
896          TimeUnit.MILLISECONDS);
897    }
898
899    @Override
900    public int compareTo(Delayed other) {
901      // Delay is compared first. If there is a tie, compare region's hash code
902      int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
903        other.getDelay(TimeUnit.MILLISECONDS)).intValue();
904      if (ret != 0) {
905        return ret;
906      }
907      FlushQueueEntry otherEntry = (FlushQueueEntry) other;
908      return hashCode() - otherEntry.hashCode();
909    }
910
911    @Override
912    public String toString() {
913      return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]";
914    }
915
916    @Override
917    public int hashCode() {
918      int hash = (int) getDelay(TimeUnit.MILLISECONDS);
919      return hash ^ region.hashCode();
920    }
921
922   @Override
923    public boolean equals(Object obj) {
924      if (this == obj) {
925        return true;
926      }
927      if (obj == null || getClass() != obj.getClass()) {
928        return false;
929      }
930      FlushRegionEntry other = (FlushRegionEntry) obj;
931      if (!Bytes.equals(this.region.getRegionInfo().getRegionName(),
932          other.region.getRegionInfo().getRegionName())) {
933        return false;
934      }
935      return compareTo(other) == 0;
936    }
937  }
938}