001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.context.Scope;
022import java.io.IOException;
023import java.lang.Thread.UncaughtExceptionHandler;
024import java.util.ArrayList;
025import java.util.ConcurrentModificationException;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.SortedMap;
032import java.util.concurrent.BlockingQueue;
033import java.util.concurrent.DelayQueue;
034import java.util.concurrent.Delayed;
035import java.util.concurrent.ThreadFactory;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.concurrent.atomic.LongAdder;
039import java.util.concurrent.locks.ReentrantReadWriteLock;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.DroppedSnapshotException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.client.RegionReplicaUtil;
044import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
045import org.apache.hadoop.hbase.trace.TraceUtil;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.ipc.RemoteException;
051import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
057
058/**
059 * Thread that flushes cache on request NOTE: This class extends Thread rather than Chore because
060 * the sleep time can be interrupted when there is something to do, rather than the Chore sleep time
061 * which is invariant.
062 * @see FlushRequester
063 */
064@InterfaceAudience.Private
065public class MemStoreFlusher implements FlushRequester {
066  private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
067
068  private Configuration conf;
069  // These two data members go together. Any entry in the one must have
070  // a corresponding entry in the other.
071  private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>();
072  protected final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>();
073  private AtomicBoolean wakeupPending = new AtomicBoolean();
074
075  private final long threadWakeFrequency;
076  private final HRegionServer server;
077  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
078  private final Object blockSignal = new Object();
079
080  private long blockingWaitTime;
081  private final LongAdder updatesBlockedMsHighWater = new LongAdder();
082
083  private final FlushHandler[] flushHandlers;
084  private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
085
086  /**
087   * Singleton instance inserted into flush queue used for signaling.
088   */
089  private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() {
090    @Override
091    public long getDelay(TimeUnit unit) {
092      return 0;
093    }
094
095    @Override
096    public int compareTo(Delayed o) {
097      return -1;
098    }
099
100    @Override
101    public boolean equals(Object obj) {
102      return obj == this;
103    }
104
105    @Override
106    public int hashCode() {
107      return 42;
108    }
109  };
110
111  /**
112   * nn
113   */
114  public MemStoreFlusher(final Configuration conf, final HRegionServer server) {
115    super();
116    this.conf = conf;
117    this.server = server;
118    this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
119    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000);
120    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
121    if (server != null) {
122      if (handlerCount < 1) {
123        LOG.warn(
124          "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
125          handlerCount);
126        handlerCount = 1;
127      }
128      LOG.info("globalMemStoreLimit="
129        + TraditionalBinaryPrefix
130          .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
131        + ", globalMemStoreLimitLowMark="
132        + TraditionalBinaryPrefix.long2String(
133          this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)
134        + ", Offheap=" + (this.server.getRegionServerAccounting().isOffheap()));
135    }
136    this.flushHandlers = new FlushHandler[handlerCount];
137  }
138
139  public LongAdder getUpdatesBlockedMsHighWater() {
140    return this.updatesBlockedMsHighWater;
141  }
142
143  /**
144   * The memstore across all regions has exceeded the low water mark. Pick one region to flush and
145   * flush it synchronously (this is called from the flush thread)
146   * @return true if successful
147   */
148  private boolean flushOneForGlobalPressure(FlushType flushType) {
149    SortedMap<Long, HRegion> regionsBySize = null;
150    switch (flushType) {
151      case ABOVE_OFFHEAP_HIGHER_MARK:
152      case ABOVE_OFFHEAP_LOWER_MARK:
153        regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize();
154        break;
155      case ABOVE_ONHEAP_HIGHER_MARK:
156      case ABOVE_ONHEAP_LOWER_MARK:
157      default:
158        regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize();
159    }
160    Set<HRegion> excludedRegions = new HashSet<>();
161
162    double secondaryMultiplier =
163      ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
164
165    boolean flushedOne = false;
166    while (!flushedOne) {
167      // Find the biggest region that doesn't have too many storefiles (might be null!)
168      HRegion bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
169      // Find the biggest region, total, even if it might have too many flushes.
170      HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false);
171      // Find the biggest region that is a secondary region
172      HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions);
173      if (bestAnyRegion == null) {
174        // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null
175        bestAnyRegion = bestRegionReplica;
176      }
177      if (bestAnyRegion == null) {
178        LOG.error("Above memory mark but there are no flushable regions!");
179        return false;
180      }
181
182      HRegion regionToFlush;
183      long bestAnyRegionSize;
184      long bestFlushableRegionSize;
185      switch (flushType) {
186        case ABOVE_OFFHEAP_HIGHER_MARK:
187        case ABOVE_OFFHEAP_LOWER_MARK:
188          bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
189          bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion);
190          break;
191
192        case ABOVE_ONHEAP_HIGHER_MARK:
193        case ABOVE_ONHEAP_LOWER_MARK:
194          bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
195          bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion);
196          break;
197
198        default:
199          bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
200          bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion);
201      }
202      if (bestAnyRegionSize > 2 * bestFlushableRegionSize) {
203        // Even if it's not supposed to be flushed, pick a region if it's more than twice
204        // as big as the best flushable one - otherwise when we're under pressure we make
205        // lots of little flushes and cause lots of compactions, etc, which just makes
206        // life worse!
207        if (LOG.isDebugEnabled()) {
208          LOG.debug("Under global heap pressure: " + "Region "
209            + bestAnyRegion.getRegionInfo().getRegionNameAsString() + " has too many "
210            + "store files, but is " + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1)
211            + " vs best flushable region's "
212            + TraditionalBinaryPrefix.long2String(bestFlushableRegionSize, "", 1)
213            + ". Choosing the bigger.");
214        }
215        regionToFlush = bestAnyRegion;
216      } else {
217        if (bestFlushableRegion == null) {
218          regionToFlush = bestAnyRegion;
219        } else {
220          regionToFlush = bestFlushableRegion;
221        }
222      }
223
224      long regionToFlushSize;
225      long bestRegionReplicaSize;
226      switch (flushType) {
227        case ABOVE_OFFHEAP_HIGHER_MARK:
228        case ABOVE_OFFHEAP_LOWER_MARK:
229          regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
230          bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica);
231          break;
232
233        case ABOVE_ONHEAP_HIGHER_MARK:
234        case ABOVE_ONHEAP_LOWER_MARK:
235          regionToFlushSize = regionToFlush.getMemStoreHeapSize();
236          bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica);
237          break;
238
239        default:
240          regionToFlushSize = regionToFlush.getMemStoreDataSize();
241          bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica);
242      }
243
244      if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) {
245        // A concurrency issue (such as splitting region) may happen such that the online region
246        // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to
247        // getBiggestMemStoreRegion(). This means that we can come out of the loop
248        LOG.debug("Above memory mark but there is no flushable region");
249        return false;
250      }
251
252      if (
253        regionToFlush == null || (bestRegionReplica != null
254          && ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf)
255          && (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))
256      ) {
257        LOG.info("Refreshing storefiles of region " + bestRegionReplica
258          + " due to global heap pressure. Total memstore off heap size="
259          + TraditionalBinaryPrefix
260            .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1)
261          + " memstore heap size=" + TraditionalBinaryPrefix
262            .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
263        flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
264        if (!flushedOne) {
265          LOG.info("Excluding secondary region " + bestRegionReplica
266            + " - trying to find a different region to refresh files.");
267          excludedRegions.add(bestRegionReplica);
268        }
269      } else {
270        LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
271          + "Flush type=" + flushType.toString() + ", Total Memstore Heap size="
272          + TraditionalBinaryPrefix
273            .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)
274          + ", Total Memstore Off-Heap size="
275          + TraditionalBinaryPrefix
276            .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1)
277          + ", Region memstore size="
278          + TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
279        flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY);
280
281        if (!flushedOne) {
282          LOG.info("Excluding unflushable region " + regionToFlush
283            + " - trying to find a different region to flush.");
284          excludedRegions.add(regionToFlush);
285        }
286      }
287    }
288    return true;
289  }
290
291  /** Returns Return memstore offheap size or null if <code>r</code> is null */
292  private static long getMemStoreOffHeapSize(HRegion r) {
293    return r == null ? 0 : r.getMemStoreOffHeapSize();
294  }
295
296  /** Returns Return memstore heap size or null if <code>r</code> is null */
297  private static long getMemStoreHeapSize(HRegion r) {
298    return r == null ? 0 : r.getMemStoreHeapSize();
299  }
300
301  /** Returns Return memstore data size or null if <code>r</code> is null */
302  private static long getMemStoreDataSize(HRegion r) {
303    return r == null ? 0 : r.getMemStoreDataSize();
304  }
305
306  private class FlushHandler extends Thread {
307
308    private FlushHandler(String name) {
309      super(name);
310    }
311
312    @Override
313    public void run() {
314      while (!server.isStopped()) {
315        FlushQueueEntry fqe = null;
316        try {
317          wakeupPending.set(false); // allow someone to wake us up again
318          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
319          if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
320            FlushType type = isAboveLowWaterMark();
321            if (type != FlushType.NORMAL) {
322              LOG.debug("Flush thread woke up because memory above low water="
323                + TraditionalBinaryPrefix.long2String(
324                  server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1));
325              // For offheap memstore, even if the lower water mark was breached due to heap
326              // overhead
327              // we still select the regions based on the region's memstore data size.
328              // TODO : If we want to decide based on heap over head it can be done without tracking
329              // it per region.
330              if (!flushOneForGlobalPressure(type)) {
331                // Wasn't able to flush any region, but we're above low water mark
332                // This is unlikely to happen, but might happen when closing the
333                // entire server - another thread is flushing regions. We'll just
334                // sleep a little bit to avoid spinning, and then pretend that
335                // we flushed one, so anyone blocked will check again
336                Thread.sleep(1000);
337                wakeUpIfBlocking();
338              }
339              // Enqueue another one of these tokens so we'll wake up again
340              wakeupFlushThread();
341            }
342            continue;
343          }
344          FlushRegionEntry fre = (FlushRegionEntry) fqe;
345          if (!flushRegion(fre)) {
346            break;
347          }
348        } catch (InterruptedException ex) {
349          continue;
350        } catch (ConcurrentModificationException ex) {
351          continue;
352        } catch (Exception ex) {
353          LOG.error("Cache flusher failed for entry " + fqe, ex);
354          if (!server.checkFileSystem()) {
355            break;
356          }
357        }
358      }
359      synchronized (regionsInQueue) {
360        regionsInQueue.clear();
361        flushQueue.clear();
362      }
363
364      // Signal anyone waiting, so they see the close flag
365      wakeUpIfBlocking();
366      LOG.info(getName() + " exiting");
367    }
368  }
369
370  private void wakeupFlushThread() {
371    if (wakeupPending.compareAndSet(false, true)) {
372      flushQueue.add(WAKEUPFLUSH_INSTANCE);
373    }
374  }
375
376  private HRegion getBiggestMemStoreRegion(SortedMap<Long, HRegion> regionsBySize,
377    Set<HRegion> excludedRegions, boolean checkStoreFileCount) {
378    synchronized (regionsInQueue) {
379      for (HRegion region : regionsBySize.values()) {
380        if (excludedRegions.contains(region)) {
381          continue;
382        }
383
384        if (region.writestate.flushing || !region.writestate.writesEnabled) {
385          continue;
386        }
387
388        if (checkStoreFileCount && isTooManyStoreFiles(region)) {
389          continue;
390        }
391        return region;
392      }
393    }
394    return null;
395  }
396
397  private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize,
398    Set<HRegion> excludedRegions) {
399    synchronized (regionsInQueue) {
400      for (HRegion region : regionsBySize.values()) {
401        if (excludedRegions.contains(region)) {
402          continue;
403        }
404
405        if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
406          continue;
407        }
408
409        return region;
410      }
411    }
412    return null;
413  }
414
415  private boolean refreshStoreFilesAndReclaimMemory(Region region) {
416    try {
417      return region.refreshStoreFiles();
418    } catch (IOException e) {
419      LOG.warn("Refreshing store files failed with exception", e);
420    }
421    return false;
422  }
423
424  /**
425   * Return the FlushType if global memory usage is above the high watermark
426   */
427  private FlushType isAboveHighWaterMark() {
428    return server.getRegionServerAccounting().isAboveHighWaterMark();
429  }
430
431  /**
432   * Return the FlushType if we're above the low watermark
433   */
434  private FlushType isAboveLowWaterMark() {
435    return server.getRegionServerAccounting().isAboveLowWaterMark();
436  }
437
438  @Override
439  public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) {
440    return this.requestFlush(r, null, tracker);
441  }
442
443  @Override
444  public boolean requestFlush(HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) {
445    synchronized (regionsInQueue) {
446      FlushRegionEntry existFqe = regionsInQueue.get(r);
447      if (existFqe != null) {
448        // if a delayed one exists and not reach the time to execute, just remove it
449        if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) {
450          LOG.info("Remove the existing delayed flush entry for {}, "
451            + "because we need to flush it immediately", r);
452          this.regionsInQueue.remove(r);
453          this.flushQueue.remove(existFqe);
454          r.decrementFlushesQueuedCount();
455        } else {
456          tracker.notExecuted("Flush already requested on " + r);
457          return false;
458        }
459      }
460
461      // This entry has no delay so it will be added at the top of the flush
462      // queue. It'll come out near immediately.
463      FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker);
464      this.regionsInQueue.put(r, fqe);
465      this.flushQueue.add(fqe);
466      r.incrementFlushesQueuedCount();
467      return true;
468    }
469  }
470
471  @Override
472  public boolean requestDelayedFlush(HRegion r, long delay) {
473    synchronized (regionsInQueue) {
474      if (!regionsInQueue.containsKey(r)) {
475        // This entry has some delay
476        FlushRegionEntry fqe = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
477        fqe.requeue(delay);
478        this.regionsInQueue.put(r, fqe);
479        this.flushQueue.add(fqe);
480        r.incrementFlushesQueuedCount();
481        return true;
482      }
483      return false;
484    }
485  }
486
487  public int getFlushQueueSize() {
488    return flushQueue.size();
489  }
490
491  /**
492   * Only interrupt once it's done with a run through the work loop.
493   */
494  void interruptIfNecessary() {
495    lock.writeLock().lock();
496    try {
497      for (FlushHandler flushHander : flushHandlers) {
498        if (flushHander != null) flushHander.interrupt();
499      }
500    } finally {
501      lock.writeLock().unlock();
502    }
503  }
504
505  synchronized void start(UncaughtExceptionHandler eh) {
506    ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder()
507      .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d")
508      .setDaemon(true).setUncaughtExceptionHandler(eh).build();
509    for (int i = 0; i < flushHandlers.length; i++) {
510      flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
511      flusherThreadFactory.newThread(flushHandlers[i]);
512      flushHandlers[i].start();
513    }
514  }
515
516  boolean isAlive() {
517    for (FlushHandler flushHander : flushHandlers) {
518      if (flushHander != null && flushHander.isAlive()) {
519        return true;
520      }
521    }
522    return false;
523  }
524
525  void join() {
526    for (FlushHandler flushHander : flushHandlers) {
527      if (flushHander != null) {
528        Threads.shutdown(flushHander);
529      }
530    }
531  }
532
533  /**
534   * A flushRegion that checks store file count. If too many, puts the flush on delay queue to retry
535   * later. n * @return true if the region was successfully flushed, false otherwise. If false,
536   * there will be accompanying log messages explaining why the region was not flushed.
537   */
538  private boolean flushRegion(final FlushRegionEntry fqe) {
539    HRegion region = fqe.region;
540    if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) {
541      if (fqe.isMaximumWait(this.blockingWaitTime)) {
542        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime)
543          + "ms on a compaction to clean up 'too many store files'; waited "
544          + "long enough... proceeding with flush of "
545          + region.getRegionInfo().getRegionNameAsString());
546      } else {
547        // If this is first time we've been put off, then emit a log message.
548        if (fqe.getRequeueCount() <= 0) {
549          // Note: We don't impose blockingStoreFiles constraint on meta regions
550          LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
551            region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
552            this.blockingWaitTime);
553          final CompactSplit compactSplitThread = server.getCompactSplitThread();
554          if (!compactSplitThread.requestSplit(region)) {
555            try {
556              compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
557            } catch (IOException e) {
558              e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
559              LOG.error("Cache flush failed for region "
560                + Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e);
561            }
562          }
563        }
564
565        // Put back on the queue. Have it come back out of the queue
566        // after a delay of this.blockingWaitTime / 100 ms.
567        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
568        // Tell a lie, it's not flushed but it's ok
569        return true;
570      }
571    }
572    return flushRegion(region, false, fqe.families, fqe.getTracker());
573  }
574
575  /**
576   * Flush a region.
577   * @param region         Region to flush.
578   * @param emergencyFlush Set if we are being force flushed. If true the region needs to be removed
579   *                       from the flush queue. If false, when we were called from the main flusher
580   *                       run loop and we got the entry to flush by calling poll on the flush queue
581   *                       (which removed it).
582   * @param families       stores of region to flush.
583   * @return true if the region was successfully flushed, false otherwise. If false, there will be
584   *         accompanying log messages explaining why the region was not flushed.
585   */
586  private boolean flushRegion(HRegion region, boolean emergencyFlush, List<byte[]> families,
587    FlushLifeCycleTracker tracker) {
588    synchronized (this.regionsInQueue) {
589      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
590      // Use the start time of the FlushRegionEntry if available
591      if (fqe != null && emergencyFlush) {
592        // Need to remove from region from delay queue. When NOT an
593        // emergencyFlush, then item was removed via a flushQueue.poll.
594        flushQueue.remove(fqe);
595      }
596    }
597
598    tracker.beforeExecution();
599    lock.readLock().lock();
600    final CompactSplit compactSplitThread = server.getCompactSplitThread();
601    try {
602      notifyFlushRequest(region, emergencyFlush);
603      FlushResult flushResult = region.flushcache(families, false, tracker);
604      boolean shouldCompact = flushResult.isCompactionNeeded();
605      // We just want to check the size
606      boolean shouldSplit = region.checkSplit().isPresent();
607      if (shouldSplit) {
608        compactSplitThread.requestSplit(region);
609      } else if (shouldCompact) {
610        compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
611      }
612    } catch (DroppedSnapshotException ex) {
613      // Cache flush can fail in a few places. If it fails in a critical
614      // section, we get a DroppedSnapshotException and a replay of wal
615      // is required. Currently the only way to do this is a restart of
616      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
617      // where hdfs was bad but passed the hdfs check).
618      server.abort("Replay of WAL required. Forcing server shutdown", ex);
619      return false;
620    } catch (IOException ex) {
621      ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
622      LOG.error("Cache flush failed" + (region != null
623        ? (" for region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()))
624        : ""), ex);
625      if (!server.checkFileSystem()) {
626        return false;
627      }
628    } finally {
629      lock.readLock().unlock();
630      wakeUpIfBlocking();
631      tracker.afterExecution();
632    }
633    return true;
634  }
635
636  private void notifyFlushRequest(Region region, boolean emergencyFlush) {
637    FlushType type = null;
638    if (emergencyFlush) {
639      type = isAboveHighWaterMark();
640    }
641    if (type == null) {
642      type = isAboveLowWaterMark();
643    }
644    for (FlushRequestListener listener : flushRequestListeners) {
645      listener.flushRequested(type, region);
646    }
647  }
648
649  private void wakeUpIfBlocking() {
650    synchronized (blockSignal) {
651      blockSignal.notifyAll();
652    }
653  }
654
655  private boolean isTooManyStoreFiles(Region region) {
656
657    // When compaction is disabled, the region is flushable
658    if (!region.getTableDescriptor().isCompactionEnabled()) {
659      return false;
660    }
661
662    for (Store store : region.getStores()) {
663      if (store.hasTooManyStoreFiles()) {
664        return true;
665      }
666    }
667    return false;
668  }
669
670  private int getStoreFileCount(Region region) {
671    int count = 0;
672    for (Store store : region.getStores()) {
673      count += store.getStorefilesCount();
674    }
675    return count;
676  }
677
678  /**
679   * Check if the regionserver's memstore memory usage is greater than the limit. If so, flush
680   * regions with the biggest memstores until we're down to the lower limit. This method blocks
681   * callers until we're down to a safe amount of memstore consumption.
682   */
683  public void reclaimMemStoreMemory() {
684    Span span =
685      TraceUtil.getGlobalTracer().spanBuilder("MemStoreFluser.reclaimMemStoreMemory").startSpan();
686    try (Scope scope = span.makeCurrent()) {
687      FlushType flushType = isAboveHighWaterMark();
688      if (flushType != FlushType.NORMAL) {
689        span.addEvent("Force Flush. We're above high water mark.");
690        long start = EnvironmentEdgeManager.currentTime();
691        long nextLogTimeMs = start;
692        synchronized (this.blockSignal) {
693          boolean blocked = false;
694          long startTime = 0;
695          boolean interrupted = false;
696          try {
697            flushType = isAboveHighWaterMark();
698            while (flushType != FlushType.NORMAL && !server.isStopped()) {
699              if (!blocked) {
700                startTime = EnvironmentEdgeManager.currentTime();
701                if (!server.getRegionServerAccounting().isOffheap()) {
702                  logMsg("global memstore heapsize",
703                    server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
704                    server.getRegionServerAccounting().getGlobalMemStoreLimit());
705                } else {
706                  switch (flushType) {
707                    case ABOVE_OFFHEAP_HIGHER_MARK:
708                      logMsg("the global offheap memstore datasize",
709                        server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
710                        server.getRegionServerAccounting().getGlobalMemStoreLimit());
711                      break;
712                    case ABOVE_ONHEAP_HIGHER_MARK:
713                      logMsg("global memstore heapsize",
714                        server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
715                        server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
716                      break;
717                    default:
718                      break;
719                  }
720                }
721              }
722              blocked = true;
723              wakeupFlushThread();
724              try {
725                // we should be able to wait forever, but we've seen a bug where
726                // we miss a notify, so put a 5 second bound on it at least.
727                blockSignal.wait(5 * 1000);
728              } catch (InterruptedException ie) {
729                LOG.warn("Interrupted while waiting");
730                interrupted = true;
731              }
732              long nowMs = EnvironmentEdgeManager.currentTime();
733              if (nowMs >= nextLogTimeMs) {
734                LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
735                nextLogTimeMs = nowMs + 1000;
736              }
737              flushType = isAboveHighWaterMark();
738            }
739          } finally {
740            if (interrupted) {
741              Thread.currentThread().interrupt();
742            }
743          }
744
745          if (blocked) {
746            final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
747            if (totalTime > 0) {
748              this.updatesBlockedMsHighWater.add(totalTime);
749            }
750            LOG.info("Unblocking updates for server " + server.toString());
751          }
752        }
753      } else {
754        flushType = isAboveLowWaterMark();
755        if (flushType != FlushType.NORMAL) {
756          wakeupFlushThread();
757        }
758        span.end();
759      }
760    }
761  }
762
763  private void logMsg(String type, long val, long max) {
764    LOG.info("Blocking updates: {} {} is >= blocking {}", type,
765      TraditionalBinaryPrefix.long2String(val, "", 1),
766      TraditionalBinaryPrefix.long2String(max, "", 1));
767  }
768
769  @Override
770  public String toString() {
771    return "flush_queue=" + flushQueue.size();
772  }
773
774  public String dumpQueue() {
775    StringBuilder queueList = new StringBuilder();
776    queueList.append("Flush Queue Queue dump:\n");
777    queueList.append("  Flush Queue:\n");
778    java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
779
780    while (it.hasNext()) {
781      queueList.append("    " + it.next().toString());
782      queueList.append("\n");
783    }
784
785    return queueList.toString();
786  }
787
788  /**
789   * Register a MemstoreFlushListener n
790   */
791  @Override
792  public void registerFlushRequestListener(final FlushRequestListener listener) {
793    this.flushRequestListeners.add(listener);
794  }
795
796  /**
797   * Unregister the listener from MemstoreFlushListeners n * @return true when passed listener is
798   * unregistered successfully.
799   */
800  @Override
801  public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
802    return this.flushRequestListeners.remove(listener);
803  }
804
805  /**
806   * Sets the global memstore limit to a new size. n
807   */
808  @Override
809  public void setGlobalMemStoreLimit(long globalMemStoreSize) {
810    this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize);
811    reclaimMemStoreMemory();
812  }
813
814  interface FlushQueueEntry extends Delayed {
815  }
816
817  /**
818   * Datastructure used in the flush queue. Holds region and retry count. Keeps tabs on how old this
819   * object is. Implements {@link Delayed}. On construction, the delay is zero. When added to a
820   * delay queue, we'll come out near immediately. Call {@link #requeue(long)} passing delay in
821   * milliseconds before readding to delay queue if you want it to stay there a while.
822   */
823  static class FlushRegionEntry implements FlushQueueEntry {
824    private final HRegion region;
825
826    private final long createTime;
827    private long whenToExpire;
828    private int requeueCount = 0;
829
830    private final List<byte[]> families;
831
832    private final FlushLifeCycleTracker tracker;
833
834    FlushRegionEntry(final HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) {
835      this.region = r;
836      this.createTime = EnvironmentEdgeManager.currentTime();
837      this.whenToExpire = this.createTime;
838      this.families = families;
839      this.tracker = tracker;
840    }
841
842    /**
843     * n * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
844     */
845    public boolean isMaximumWait(final long maximumWait) {
846      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
847    }
848
849    /** Returns True if the entry is a delay flush task */
850    protected boolean isDelay() {
851      return this.whenToExpire > this.createTime;
852    }
853
854    /**
855     * @return Count of times {@link #requeue(long)} was called; i.e this is number of times we've
856     *         been requeued.
857     */
858    public int getRequeueCount() {
859      return this.requeueCount;
860    }
861
862    public FlushLifeCycleTracker getTracker() {
863      return tracker;
864    }
865
866    /**
867     * @param when When to expire, when to come up out of the queue. Specify in milliseconds. This
868     *             method adds EnvironmentEdgeManager.currentTime() to whatever you pass.
869     * @return This.
870     */
871    public FlushRegionEntry requeue(final long when) {
872      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
873      this.requeueCount++;
874      return this;
875    }
876
877    @Override
878    public long getDelay(TimeUnit unit) {
879      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
880        TimeUnit.MILLISECONDS);
881    }
882
883    @Override
884    public int compareTo(Delayed other) {
885      // Delay is compared first. If there is a tie, compare region's hash code
886      int ret =
887        Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS))
888          .intValue();
889      if (ret != 0) {
890        return ret;
891      }
892      FlushQueueEntry otherEntry = (FlushQueueEntry) other;
893      return hashCode() - otherEntry.hashCode();
894    }
895
896    @Override
897    public String toString() {
898      return "[flush region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()) + "]";
899    }
900
901    @Override
902    public int hashCode() {
903      int hash = (int) getDelay(TimeUnit.MILLISECONDS);
904      return hash ^ region.hashCode();
905    }
906
907    @Override
908    public boolean equals(Object obj) {
909      if (this == obj) {
910        return true;
911      }
912      if (obj == null || getClass() != obj.getClass()) {
913        return false;
914      }
915      FlushRegionEntry other = (FlushRegionEntry) obj;
916      if (
917        !Bytes.equals(this.region.getRegionInfo().getRegionName(),
918          other.region.getRegionInfo().getRegionName())
919      ) {
920        return false;
921      }
922      return compareTo(other) == 0;
923    }
924  }
925}