001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.Thread.UncaughtExceptionHandler;
023import java.util.ArrayList;
024import java.util.ConcurrentModificationException;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.SortedMap;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.DelayQueue;
033import java.util.concurrent.Delayed;
034import java.util.concurrent.ThreadFactory;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.LongAdder;
038import java.util.concurrent.locks.ReentrantReadWriteLock;
039
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.DroppedSnapshotException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.client.RegionReplicaUtil;
044import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
045import org.apache.hadoop.hbase.trace.TraceUtil;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.ipc.RemoteException;
051import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
052import org.apache.htrace.core.TraceScope;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * Thread that flushes cache on request
059 *
060 * NOTE: This class extends Thread rather than Chore because the sleep time
061 * can be interrupted when there is something to do, rather than the Chore
062 * sleep time which is invariant.
063 *
064 * @see FlushRequester
065 */
066@InterfaceAudience.Private
067class MemStoreFlusher implements FlushRequester {
068  private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
069
070  private Configuration conf;
071  // These two data members go together.  Any entry in the one must have
072  // a corresponding entry in the other.
073  private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>();
074  private final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>();
075  private AtomicBoolean wakeupPending = new AtomicBoolean();
076
077  private final long threadWakeFrequency;
078  private final HRegionServer server;
079  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
080  private final Object blockSignal = new Object();
081
082  private long blockingWaitTime;
083  private final LongAdder updatesBlockedMsHighWater = new LongAdder();
084
085  private final FlushHandler[] flushHandlers;
086  private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
087
088  /**
089   * Singleton instance inserted into flush queue used for signaling.
090   */
091  private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() {
092    @Override
093    public long getDelay(TimeUnit unit) {
094      return 0;
095    }
096
097    @Override
098    public int compareTo(Delayed o) {
099      return -1;
100    }
101
102    @Override
103    public boolean equals(Object obj) {
104      return obj == this;
105    }
106
107    @Override
108    public int hashCode() {
109      return 42;
110    }
111  };
112
113
114  /**
115   * @param conf
116   * @param server
117   */
118  public MemStoreFlusher(final Configuration conf,
119      final HRegionServer server) {
120    super();
121    this.conf = conf;
122    this.server = server;
123    this.threadWakeFrequency =
124        conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
125    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
126      90000);
127    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
128    if (handlerCount < 1) {
129      LOG.warn("hbase.hstore.flusher.count was configed to {} which is less than 1, corrected to 1",
130          handlerCount);
131      handlerCount = 1;
132    }
133    this.flushHandlers = new FlushHandler[handlerCount];
134    LOG.info("globalMemStoreLimit="
135        + TraditionalBinaryPrefix
136            .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
137        + ", globalMemStoreLimitLowMark="
138        + TraditionalBinaryPrefix.long2String(
139          this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)
140        + ", Offheap="
141        + (this.server.getRegionServerAccounting().isOffheap()));
142  }
143
144  public LongAdder getUpdatesBlockedMsHighWater() {
145    return this.updatesBlockedMsHighWater;
146  }
147
148  /**
149   * The memstore across all regions has exceeded the low water mark. Pick
150   * one region to flush and flush it synchronously (this is called from the
151   * flush thread)
152   * @return true if successful
153   */
154  private boolean flushOneForGlobalPressure(FlushType flushType) {
155    SortedMap<Long, HRegion> regionsBySize = null;
156    switch(flushType) {
157      case ABOVE_OFFHEAP_HIGHER_MARK:
158      case ABOVE_OFFHEAP_LOWER_MARK:
159        regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize();
160        break;
161      case ABOVE_ONHEAP_HIGHER_MARK:
162      case ABOVE_ONHEAP_LOWER_MARK:
163      default:
164        regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize();
165    }
166    Set<HRegion> excludedRegions = new HashSet<>();
167
168    double secondaryMultiplier
169      = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
170
171    boolean flushedOne = false;
172    while (!flushedOne) {
173      // Find the biggest region that doesn't have too many storefiles (might be null!)
174      HRegion bestFlushableRegion =
175          getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
176      // Find the biggest region, total, even if it might have too many flushes.
177      HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false);
178      // Find the biggest region that is a secondary region
179      HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions);
180      if (bestAnyRegion == null) {
181        // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null
182        bestAnyRegion = bestRegionReplica;
183      }
184      if (bestAnyRegion == null) {
185        LOG.error("Above memory mark but there are no flushable regions!");
186        return false;
187      }
188
189      HRegion regionToFlush;
190      long bestAnyRegionSize;
191      long bestFlushableRegionSize;
192      switch(flushType) {
193        case ABOVE_OFFHEAP_HIGHER_MARK:
194        case ABOVE_OFFHEAP_LOWER_MARK:
195          bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
196          bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion);
197          break;
198
199        case ABOVE_ONHEAP_HIGHER_MARK:
200        case ABOVE_ONHEAP_LOWER_MARK:
201          bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
202          bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion);
203          break;
204
205        default:
206          bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
207          bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion);
208      }
209      if (bestAnyRegionSize > 2 * bestFlushableRegionSize) {
210        // Even if it's not supposed to be flushed, pick a region if it's more than twice
211        // as big as the best flushable one - otherwise when we're under pressure we make
212        // lots of little flushes and cause lots of compactions, etc, which just makes
213        // life worse!
214        if (LOG.isDebugEnabled()) {
215          LOG.debug("Under global heap pressure: " + "Region "
216              + bestAnyRegion.getRegionInfo().getRegionNameAsString()
217              + " has too many " + "store files, but is "
218              + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1)
219              + " vs best flushable region's "
220              + TraditionalBinaryPrefix.long2String(
221              bestFlushableRegionSize, "", 1)
222              + ". Choosing the bigger.");
223        }
224        regionToFlush = bestAnyRegion;
225      } else {
226        if (bestFlushableRegion == null) {
227          regionToFlush = bestAnyRegion;
228        } else {
229          regionToFlush = bestFlushableRegion;
230        }
231      }
232
233      long regionToFlushSize;
234      long bestRegionReplicaSize;
235      switch(flushType) {
236        case ABOVE_OFFHEAP_HIGHER_MARK:
237        case ABOVE_OFFHEAP_LOWER_MARK:
238          regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
239          bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica);
240          break;
241
242        case ABOVE_ONHEAP_HIGHER_MARK:
243        case ABOVE_ONHEAP_LOWER_MARK:
244          regionToFlushSize = regionToFlush.getMemStoreHeapSize();
245          bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica);
246          break;
247
248        default:
249          regionToFlushSize = regionToFlush.getMemStoreDataSize();
250          bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica);
251      }
252
253      if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) {
254        // A concurrency issue (such as splitting region) may happen such that the online region
255        // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to
256        // getBiggestMemStoreRegion(). This means that we can come out of the loop
257        LOG.debug("Above memory mark but there is no flushable region");
258        return false;
259      }
260
261      if (regionToFlush == null ||
262          (bestRegionReplica != null &&
263           ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
264           (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))) {
265        LOG.info("Refreshing storefiles of region " + bestRegionReplica +
266            " due to global heap pressure. Total memstore off heap size=" +
267            TraditionalBinaryPrefix.long2String(
268              server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
269            " memstore heap size=" + TraditionalBinaryPrefix.long2String(
270              server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
271        flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
272        if (!flushedOne) {
273          LOG.info("Excluding secondary region " + bestRegionReplica +
274              " - trying to find a different region to refresh files.");
275          excludedRegions.add(bestRegionReplica);
276        }
277      } else {
278        LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " +
279            "Flush type=" + flushType.toString() +
280            ", Total Memstore Heap size=" +
281            TraditionalBinaryPrefix.long2String(
282                server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) +
283            ", Total Memstore Off-Heap size=" +
284            TraditionalBinaryPrefix.long2String(
285                server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
286            ", Region memstore size=" +
287            TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
288        flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY);
289
290        if (!flushedOne) {
291          LOG.info("Excluding unflushable region " + regionToFlush +
292              " - trying to find a different region to flush.");
293          excludedRegions.add(regionToFlush);
294        }
295      }
296    }
297    return true;
298  }
299
300  /**
301   * @return Return memstore offheap size or null if <code>r</code> is null
302   */
303  private static long getMemStoreOffHeapSize(HRegion r) {
304    return r == null? 0: r.getMemStoreOffHeapSize();
305  }
306
307  /**
308   * @return Return memstore heap size or null if <code>r</code> is null
309   */
310  private static long getMemStoreHeapSize(HRegion r) {
311    return r == null? 0: r.getMemStoreHeapSize();
312  }
313
314  /**
315   * @return Return memstore data size or null if <code>r</code> is null
316   */
317  private static long getMemStoreDataSize(HRegion r) {
318    return r == null? 0: r.getMemStoreDataSize();
319  }
320
321  private class FlushHandler extends Thread {
322
323    private FlushHandler(String name) {
324      super(name);
325    }
326
327    @Override
328    public void run() {
329      while (!server.isStopped()) {
330        FlushQueueEntry fqe = null;
331        try {
332          wakeupPending.set(false); // allow someone to wake us up again
333          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
334          if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
335            FlushType type = isAboveLowWaterMark();
336            if (type != FlushType.NORMAL) {
337              LOG.debug("Flush thread woke up because memory above low water="
338                  + TraditionalBinaryPrefix.long2String(
339                    server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1));
340              // For offheap memstore, even if the lower water mark was breached due to heap overhead
341              // we still select the regions based on the region's memstore data size.
342              // TODO : If we want to decide based on heap over head it can be done without tracking
343              // it per region.
344              if (!flushOneForGlobalPressure(type)) {
345                // Wasn't able to flush any region, but we're above low water mark
346                // This is unlikely to happen, but might happen when closing the
347                // entire server - another thread is flushing regions. We'll just
348                // sleep a little bit to avoid spinning, and then pretend that
349                // we flushed one, so anyone blocked will check again
350                Thread.sleep(1000);
351                wakeUpIfBlocking();
352              }
353              // Enqueue another one of these tokens so we'll wake up again
354              wakeupFlushThread();
355            }
356            continue;
357          }
358          FlushRegionEntry fre = (FlushRegionEntry) fqe;
359          if (!flushRegion(fre)) {
360            break;
361          }
362        } catch (InterruptedException ex) {
363          continue;
364        } catch (ConcurrentModificationException ex) {
365          continue;
366        } catch (Exception ex) {
367          LOG.error("Cache flusher failed for entry " + fqe, ex);
368          if (!server.checkFileSystem()) {
369            break;
370          }
371        }
372      }
373      synchronized (regionsInQueue) {
374        regionsInQueue.clear();
375        flushQueue.clear();
376      }
377
378      // Signal anyone waiting, so they see the close flag
379      wakeUpIfBlocking();
380      LOG.info(getName() + " exiting");
381    }
382  }
383
384
385  private void wakeupFlushThread() {
386    if (wakeupPending.compareAndSet(false, true)) {
387      flushQueue.add(WAKEUPFLUSH_INSTANCE);
388    }
389  }
390
391  private HRegion getBiggestMemStoreRegion(
392      SortedMap<Long, HRegion> regionsBySize,
393      Set<HRegion> excludedRegions,
394      boolean checkStoreFileCount) {
395    synchronized (regionsInQueue) {
396      for (HRegion region : regionsBySize.values()) {
397        if (excludedRegions.contains(region)) {
398          continue;
399        }
400
401        if (region.writestate.flushing || !region.writestate.writesEnabled) {
402          continue;
403        }
404
405        if (checkStoreFileCount && isTooManyStoreFiles(region)) {
406          continue;
407        }
408        return region;
409      }
410    }
411    return null;
412  }
413
414  private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize,
415      Set<HRegion> excludedRegions) {
416    synchronized (regionsInQueue) {
417      for (HRegion region : regionsBySize.values()) {
418        if (excludedRegions.contains(region)) {
419          continue;
420        }
421
422        if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
423          continue;
424        }
425
426        return region;
427      }
428    }
429    return null;
430  }
431
432  private boolean refreshStoreFilesAndReclaimMemory(Region region) {
433    try {
434      return region.refreshStoreFiles();
435    } catch (IOException e) {
436      LOG.warn("Refreshing store files failed with exception", e);
437    }
438    return false;
439  }
440
441  /**
442   * Return true if global memory usage is above the high watermark
443   */
444  private FlushType isAboveHighWaterMark() {
445    return server.getRegionServerAccounting().isAboveHighWaterMark();
446  }
447
448  /**
449   * Return true if we're above the low watermark
450   */
451  private FlushType isAboveLowWaterMark() {
452    return server.getRegionServerAccounting().isAboveLowWaterMark();
453  }
454
455  @Override
456  public boolean requestFlush(HRegion r, boolean forceFlushAllStores,
457                              FlushLifeCycleTracker tracker) {
458    synchronized (regionsInQueue) {
459      if (!regionsInQueue.containsKey(r)) {
460        // This entry has no delay so it will be added at the top of the flush
461        // queue. It'll come out near immediately.
462        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
463        this.regionsInQueue.put(r, fqe);
464        this.flushQueue.add(fqe);
465        r.incrementFlushesQueuedCount();
466        return true;
467      } else {
468        tracker.notExecuted("Flush already requested on " + r);
469        return false;
470      }
471    }
472  }
473
474  @Override
475  public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
476    synchronized (regionsInQueue) {
477      if (!regionsInQueue.containsKey(r)) {
478        // This entry has some delay
479        FlushRegionEntry fqe =
480            new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY);
481        fqe.requeue(delay);
482        this.regionsInQueue.put(r, fqe);
483        this.flushQueue.add(fqe);
484        r.incrementFlushesQueuedCount();
485        return true;
486      }
487      return false;
488    }
489  }
490
491  public int getFlushQueueSize() {
492    return flushQueue.size();
493  }
494
495  /**
496   * Only interrupt once it's done with a run through the work loop.
497   */
498  void interruptIfNecessary() {
499    lock.writeLock().lock();
500    try {
501      for (FlushHandler flushHander : flushHandlers) {
502        if (flushHander != null) flushHander.interrupt();
503      }
504    } finally {
505      lock.writeLock().unlock();
506    }
507  }
508
509  synchronized void start(UncaughtExceptionHandler eh) {
510    ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
511        server.getServerName().toShortString() + "-MemStoreFlusher", eh);
512    for (int i = 0; i < flushHandlers.length; i++) {
513      flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
514      flusherThreadFactory.newThread(flushHandlers[i]);
515      flushHandlers[i].start();
516    }
517  }
518
519  boolean isAlive() {
520    for (FlushHandler flushHander : flushHandlers) {
521      if (flushHander != null && flushHander.isAlive()) {
522        return true;
523      }
524    }
525    return false;
526  }
527
528  void join() {
529    for (FlushHandler flushHander : flushHandlers) {
530      if (flushHander != null) {
531        Threads.shutdown(flushHander);
532      }
533    }
534  }
535
536  /**
537   * A flushRegion that checks store file count.  If too many, puts the flush
538   * on delay queue to retry later.
539   * @param fqe
540   * @return true if the region was successfully flushed, false otherwise. If
541   * false, there will be accompanying log messages explaining why the region was
542   * not flushed.
543   */
544  private boolean flushRegion(final FlushRegionEntry fqe) {
545    HRegion region = fqe.region;
546    if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) {
547      if (fqe.isMaximumWait(this.blockingWaitTime)) {
548        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
549          "ms on a compaction to clean up 'too many store files'; waited " +
550          "long enough... proceeding with flush of " +
551          region.getRegionInfo().getRegionNameAsString());
552      } else {
553        // If this is first time we've been put off, then emit a log message.
554        if (fqe.getRequeueCount() <= 0) {
555          // Note: We don't impose blockingStoreFiles constraint on meta regions
556          LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
557              region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
558              this.blockingWaitTime);
559          if (!this.server.compactSplitThread.requestSplit(region)) {
560            try {
561              this.server.compactSplitThread.requestSystemCompaction(region,
562                Thread.currentThread().getName());
563            } catch (IOException e) {
564              e = e instanceof RemoteException ?
565                      ((RemoteException)e).unwrapRemoteException() : e;
566              LOG.error("Cache flush failed for region " +
567                Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e);
568            }
569          }
570        }
571
572        // Put back on the queue.  Have it come back out of the queue
573        // after a delay of this.blockingWaitTime / 100 ms.
574        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
575        // Tell a lie, it's not flushed but it's ok
576        return true;
577      }
578    }
579    return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker());
580  }
581
582  /**
583   * Flush a region.
584   * @param region Region to flush.
585   * @param emergencyFlush Set if we are being force flushed. If true the region
586   * needs to be removed from the flush queue. If false, when we were called
587   * from the main flusher run loop and we got the entry to flush by calling
588   * poll on the flush queue (which removed it).
589   * @param forceFlushAllStores whether we want to flush all store.
590   * @return true if the region was successfully flushed, false otherwise. If
591   * false, there will be accompanying log messages explaining why the region was
592   * not flushed.
593   */
594  private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores,
595      FlushLifeCycleTracker tracker) {
596    synchronized (this.regionsInQueue) {
597      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
598      // Use the start time of the FlushRegionEntry if available
599      if (fqe != null && emergencyFlush) {
600        // Need to remove from region from delay queue. When NOT an
601        // emergencyFlush, then item was removed via a flushQueue.poll.
602        flushQueue.remove(fqe);
603      }
604    }
605
606    tracker.beforeExecution();
607    lock.readLock().lock();
608    try {
609      notifyFlushRequest(region, emergencyFlush);
610      FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker);
611      boolean shouldCompact = flushResult.isCompactionNeeded();
612      // We just want to check the size
613      boolean shouldSplit = region.checkSplit() != null;
614      if (shouldSplit) {
615        this.server.compactSplitThread.requestSplit(region);
616      } else if (shouldCompact) {
617        server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
618      }
619    } catch (DroppedSnapshotException ex) {
620      // Cache flush can fail in a few places. If it fails in a critical
621      // section, we get a DroppedSnapshotException and a replay of wal
622      // is required. Currently the only way to do this is a restart of
623      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
624      // where hdfs was bad but passed the hdfs check).
625      server.abort("Replay of WAL required. Forcing server shutdown", ex);
626      return false;
627    } catch (IOException ex) {
628      ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
629      LOG.error(
630        "Cache flush failed"
631            + (region != null ? (" for region " +
632                Bytes.toStringBinary(region.getRegionInfo().getRegionName()))
633              : ""), ex);
634      if (!server.checkFileSystem()) {
635        return false;
636      }
637    } finally {
638      lock.readLock().unlock();
639      wakeUpIfBlocking();
640      tracker.afterExecution();
641    }
642    return true;
643  }
644
645  private void notifyFlushRequest(Region region, boolean emergencyFlush) {
646    FlushType type = null;
647    if (emergencyFlush) {
648      type = isAboveHighWaterMark();
649      if (type == null) {
650        type = isAboveLowWaterMark();
651      }
652    }
653    for (FlushRequestListener listener : flushRequestListeners) {
654      listener.flushRequested(type, region);
655    }
656  }
657
658  private void wakeUpIfBlocking() {
659    synchronized (blockSignal) {
660      blockSignal.notifyAll();
661    }
662  }
663
664  private boolean isTooManyStoreFiles(Region region) {
665
666    // When compaction is disabled, the region is flushable
667    if (!region.getTableDescriptor().isCompactionEnabled()) {
668      return false;
669    }
670
671    for (Store store : region.getStores()) {
672      if (store.hasTooManyStoreFiles()) {
673        return true;
674      }
675    }
676    return false;
677  }
678
679  private int getStoreFileCount(Region region) {
680    int count = 0;
681    for (Store store : region.getStores()) {
682      count += store.getStorefilesCount();
683    }
684    return count;
685  }
686
687  /**
688   * Check if the regionserver's memstore memory usage is greater than the
689   * limit. If so, flush regions with the biggest memstores until we're down
690   * to the lower limit. This method blocks callers until we're down to a safe
691   * amount of memstore consumption.
692   */
693  public void reclaimMemStoreMemory() {
694    try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) {
695      FlushType flushType = isAboveHighWaterMark();
696      if (flushType != FlushType.NORMAL) {
697        TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
698        long start = EnvironmentEdgeManager.currentTime();
699        long nextLogTimeMs = start;
700        synchronized (this.blockSignal) {
701          boolean blocked = false;
702          long startTime = 0;
703          boolean interrupted = false;
704          try {
705            flushType = isAboveHighWaterMark();
706            while (flushType != FlushType.NORMAL && !server.isStopped()) {
707              if (!blocked) {
708                startTime = EnvironmentEdgeManager.currentTime();
709                if (!server.getRegionServerAccounting().isOffheap()) {
710                  logMsg("global memstore heapsize",
711                      server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
712                      server.getRegionServerAccounting().getGlobalMemStoreLimit());
713                } else {
714                  switch (flushType) {
715                    case ABOVE_OFFHEAP_HIGHER_MARK:
716                      logMsg("the global offheap memstore datasize",
717                          server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
718                          server.getRegionServerAccounting().getGlobalMemStoreLimit());
719                      break;
720                    case ABOVE_ONHEAP_HIGHER_MARK:
721                      logMsg("global memstore heapsize",
722                          server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
723                          server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
724                      break;
725                    default:
726                      break;
727                  }
728                }
729              }
730              blocked = true;
731              wakeupFlushThread();
732              try {
733                // we should be able to wait forever, but we've seen a bug where
734                // we miss a notify, so put a 5 second bound on it at least.
735                blockSignal.wait(5 * 1000);
736              } catch (InterruptedException ie) {
737                LOG.warn("Interrupted while waiting");
738                interrupted = true;
739              }
740              long nowMs = EnvironmentEdgeManager.currentTime();
741              if (nowMs >= nextLogTimeMs) {
742                LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
743                nextLogTimeMs = nowMs + 1000;
744              }
745              flushType = isAboveHighWaterMark();
746            }
747          } finally {
748            if (interrupted) {
749              Thread.currentThread().interrupt();
750            }
751          }
752
753          if(blocked){
754            final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
755            if(totalTime > 0){
756              this.updatesBlockedMsHighWater.add(totalTime);
757            }
758            LOG.info("Unblocking updates for server " + server.toString());
759          }
760        }
761      } else {
762        flushType = isAboveLowWaterMark();
763        if (flushType != FlushType.NORMAL) {
764          wakeupFlushThread();
765        }
766      }
767    }
768  }
769
770  private void logMsg(String type, long val, long max) {
771    LOG.info("Blocking updates: {} {} is >= blocking {}", type,
772        TraditionalBinaryPrefix.long2String(val, "", 1),
773        TraditionalBinaryPrefix.long2String(max, "", 1));
774  }
775
776  @Override
777  public String toString() {
778    return "flush_queue="
779        + flushQueue.size();
780  }
781
782  public String dumpQueue() {
783    StringBuilder queueList = new StringBuilder();
784    queueList.append("Flush Queue Queue dump:\n");
785    queueList.append("  Flush Queue:\n");
786    java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
787
788    while(it.hasNext()){
789      queueList.append("    "+it.next().toString());
790      queueList.append("\n");
791    }
792
793    return queueList.toString();
794  }
795
796  /**
797   * Register a MemstoreFlushListener
798   * @param listener
799   */
800  @Override
801  public void registerFlushRequestListener(final FlushRequestListener listener) {
802    this.flushRequestListeners.add(listener);
803  }
804
805  /**
806   * Unregister the listener from MemstoreFlushListeners
807   * @param listener
808   * @return true when passed listener is unregistered successfully.
809   */
810  @Override
811  public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
812    return this.flushRequestListeners.remove(listener);
813  }
814
815  /**
816   * Sets the global memstore limit to a new size.
817   * @param globalMemStoreSize
818   */
819  @Override
820  public void setGlobalMemStoreLimit(long globalMemStoreSize) {
821    this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize);
822    reclaimMemStoreMemory();
823  }
824
825  interface FlushQueueEntry extends Delayed {
826  }
827
828  /**
829   * Datastructure used in the flush queue.  Holds region and retry count.
830   * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
831   * construction, the delay is zero. When added to a delay queue, we'll come
832   * out near immediately.  Call {@link #requeue(long)} passing delay in
833   * milliseconds before readding to delay queue if you want it to stay there
834   * a while.
835   */
836  static class FlushRegionEntry implements FlushQueueEntry {
837    private final HRegion region;
838
839    private final long createTime;
840    private long whenToExpire;
841    private int requeueCount = 0;
842
843    private final boolean forceFlushAllStores;
844
845    private final FlushLifeCycleTracker tracker;
846
847    FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
848      this.region = r;
849      this.createTime = EnvironmentEdgeManager.currentTime();
850      this.whenToExpire = this.createTime;
851      this.forceFlushAllStores = forceFlushAllStores;
852      this.tracker = tracker;
853    }
854
855    /**
856     * @param maximumWait
857     * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
858     */
859    public boolean isMaximumWait(final long maximumWait) {
860      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
861    }
862
863    /**
864     * @return Count of times {@link #requeue(long)} was called; i.e this is
865     * number of times we've been requeued.
866     */
867    public int getRequeueCount() {
868      return this.requeueCount;
869    }
870
871    /**
872     * @return whether we need to flush all stores.
873     */
874    public boolean isForceFlushAllStores() {
875      return forceFlushAllStores;
876    }
877
878    public FlushLifeCycleTracker getTracker() {
879      return tracker;
880    }
881
882    /**
883     * @param when When to expire, when to come up out of the queue.
884     * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
885     * to whatever you pass.
886     * @return This.
887     */
888    public FlushRegionEntry requeue(final long when) {
889      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
890      this.requeueCount++;
891      return this;
892    }
893
894    @Override
895    public long getDelay(TimeUnit unit) {
896      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
897          TimeUnit.MILLISECONDS);
898    }
899
900    @Override
901    public int compareTo(Delayed other) {
902      // Delay is compared first. If there is a tie, compare region's hash code
903      int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
904        other.getDelay(TimeUnit.MILLISECONDS)).intValue();
905      if (ret != 0) {
906        return ret;
907      }
908      FlushQueueEntry otherEntry = (FlushQueueEntry) other;
909      return hashCode() - otherEntry.hashCode();
910    }
911
912    @Override
913    public String toString() {
914      return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]";
915    }
916
917    @Override
918    public int hashCode() {
919      int hash = (int) getDelay(TimeUnit.MILLISECONDS);
920      return hash ^ region.hashCode();
921    }
922
923   @Override
924    public boolean equals(Object obj) {
925      if (this == obj) {
926        return true;
927      }
928      if (obj == null || getClass() != obj.getClass()) {
929        return false;
930      }
931      FlushRegionEntry other = (FlushRegionEntry) obj;
932      if (!Bytes.equals(this.region.getRegionInfo().getRegionName(),
933          other.region.getRegionInfo().getRegionName())) {
934        return false;
935      }
936      return compareTo(other) == 0;
937    }
938  }
939}