001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.context.Scope;
022import java.io.IOException;
023import java.lang.Thread.UncaughtExceptionHandler;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.ConcurrentModificationException;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.SortedMap;
033import java.util.concurrent.BlockingQueue;
034import java.util.concurrent.DelayQueue;
035import java.util.concurrent.Delayed;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.LongAdder;
040import java.util.concurrent.locks.ReentrantReadWriteLock;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.DroppedSnapshotException;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.client.RegionReplicaUtil;
045import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
046import org.apache.hadoop.hbase.trace.TraceUtil;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.hadoop.ipc.RemoteException;
052import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
058
059/**
060 * Thread that flushes cache on request NOTE: This class extends Thread rather than Chore because
061 * the sleep time can be interrupted when there is something to do, rather than the Chore sleep time
062 * which is invariant.
063 * @see FlushRequester
064 */
065@InterfaceAudience.Private
066public class MemStoreFlusher implements FlushRequester {
067  private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
068
069  private Configuration conf;
070  // These two data members go together. Any entry in the one must have
071  // a corresponding entry in the other.
072  private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>();
073  protected final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>();
074  private AtomicBoolean wakeupPending = new AtomicBoolean();
075
076  private final long threadWakeFrequency;
077  private final HRegionServer server;
078  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
079  private final Object blockSignal = new Object();
080
081  private long blockingWaitTime;
082  private final LongAdder updatesBlockedMsHighWater = new LongAdder();
083
084  private final FlushHandler[] flushHandlers;
085  private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
086
087  /**
088   * Singleton instance inserted into flush queue used for signaling.
089   */
090  private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() {
091    @Override
092    public long getDelay(TimeUnit unit) {
093      return 0;
094    }
095
096    @Override
097    public int compareTo(Delayed o) {
098      return -1;
099    }
100
101    @Override
102    public boolean equals(Object obj) {
103      return obj == this;
104    }
105
106    @Override
107    public int hashCode() {
108      return 42;
109    }
110  };
111
112  /**
113   * nn
114   */
115  public MemStoreFlusher(final Configuration conf, final HRegionServer server) {
116    super();
117    this.conf = conf;
118    this.server = server;
119    this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
120    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000);
121    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
122    if (server != null) {
123      if (handlerCount < 1) {
124        LOG.warn(
125          "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
126          handlerCount);
127        handlerCount = 1;
128      }
129      LOG.info("globalMemStoreLimit="
130        + TraditionalBinaryPrefix
131          .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
132        + ", globalMemStoreLimitLowMark="
133        + TraditionalBinaryPrefix.long2String(
134          this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)
135        + ", Offheap=" + (this.server.getRegionServerAccounting().isOffheap()));
136    }
137    this.flushHandlers = new FlushHandler[handlerCount];
138  }
139
140  public LongAdder getUpdatesBlockedMsHighWater() {
141    return this.updatesBlockedMsHighWater;
142  }
143
144  /**
145   * The memstore across all regions has exceeded the low water mark. Pick one region to flush and
146   * flush it synchronously (this is called from the flush thread)
147   * @return true if successful
148   */
149  private boolean flushOneForGlobalPressure(FlushType flushType) {
150    SortedMap<Long, Collection<HRegion>> regionsBySize = null;
151    switch (flushType) {
152      case ABOVE_OFFHEAP_HIGHER_MARK:
153      case ABOVE_OFFHEAP_LOWER_MARK:
154        regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize();
155        break;
156      case ABOVE_ONHEAP_HIGHER_MARK:
157      case ABOVE_ONHEAP_LOWER_MARK:
158      default:
159        regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize();
160    }
161    Set<HRegion> excludedRegions = new HashSet<>();
162
163    double secondaryMultiplier =
164      ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
165
166    boolean flushedOne = false;
167    while (!flushedOne) {
168      // Find the biggest region that doesn't have too many storefiles (might be null!)
169      HRegion bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
170      // Find the biggest region, total, even if it might have too many flushes.
171      HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false);
172      // Find the biggest region that is a secondary region
173      HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions);
174      if (bestAnyRegion == null) {
175        // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null
176        bestAnyRegion = bestRegionReplica;
177      }
178      if (bestAnyRegion == null) {
179        LOG.error("Above memory mark but there are no flushable regions!");
180        return false;
181      }
182
183      HRegion regionToFlush;
184      long bestAnyRegionSize;
185      long bestFlushableRegionSize;
186      switch (flushType) {
187        case ABOVE_OFFHEAP_HIGHER_MARK:
188        case ABOVE_OFFHEAP_LOWER_MARK:
189          bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
190          bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion);
191          break;
192
193        case ABOVE_ONHEAP_HIGHER_MARK:
194        case ABOVE_ONHEAP_LOWER_MARK:
195          bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
196          bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion);
197          break;
198
199        default:
200          bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
201          bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion);
202      }
203      if (bestAnyRegionSize > 2 * bestFlushableRegionSize) {
204        // Even if it's not supposed to be flushed, pick a region if it's more than twice
205        // as big as the best flushable one - otherwise when we're under pressure we make
206        // lots of little flushes and cause lots of compactions, etc, which just makes
207        // life worse!
208        if (LOG.isDebugEnabled()) {
209          LOG.debug("Under global heap pressure: " + "Region "
210            + bestAnyRegion.getRegionInfo().getRegionNameAsString() + " has too many "
211            + "store files, but is " + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1)
212            + " vs best flushable region's "
213            + TraditionalBinaryPrefix.long2String(bestFlushableRegionSize, "", 1)
214            + ". Choosing the bigger.");
215        }
216        regionToFlush = bestAnyRegion;
217      } else {
218        if (bestFlushableRegion == null) {
219          regionToFlush = bestAnyRegion;
220        } else {
221          regionToFlush = bestFlushableRegion;
222        }
223      }
224
225      long regionToFlushSize;
226      long bestRegionReplicaSize;
227      switch (flushType) {
228        case ABOVE_OFFHEAP_HIGHER_MARK:
229        case ABOVE_OFFHEAP_LOWER_MARK:
230          regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
231          bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica);
232          break;
233
234        case ABOVE_ONHEAP_HIGHER_MARK:
235        case ABOVE_ONHEAP_LOWER_MARK:
236          regionToFlushSize = regionToFlush.getMemStoreHeapSize();
237          bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica);
238          break;
239
240        default:
241          regionToFlushSize = regionToFlush.getMemStoreDataSize();
242          bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica);
243      }
244
245      if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) {
246        // A concurrency issue (such as splitting region) may happen such that the online region
247        // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to
248        // getBiggestMemStoreRegion(). This means that we can come out of the loop
249        LOG.debug("Above memory mark but there is no flushable region");
250        return false;
251      }
252
253      if (
254        regionToFlush == null || (bestRegionReplica != null
255          && ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf)
256          && (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))
257      ) {
258        LOG.info("Refreshing storefiles of region " + bestRegionReplica
259          + " due to global heap pressure. Total memstore off heap size="
260          + TraditionalBinaryPrefix
261            .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1)
262          + " memstore heap size=" + TraditionalBinaryPrefix
263            .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
264        flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
265        if (!flushedOne) {
266          LOG.info("Excluding secondary region " + bestRegionReplica
267            + " - trying to find a different region to refresh files.");
268          excludedRegions.add(bestRegionReplica);
269        }
270      } else {
271        LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
272          + "Flush type=" + flushType.toString() + ", Total Memstore Heap size="
273          + TraditionalBinaryPrefix
274            .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)
275          + ", Total Memstore Off-Heap size="
276          + TraditionalBinaryPrefix
277            .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1)
278          + ", Region memstore size="
279          + TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
280        flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY);
281
282        if (!flushedOne) {
283          LOG.info("Excluding unflushable region " + regionToFlush
284            + " - trying to find a different region to flush.");
285          excludedRegions.add(regionToFlush);
286        }
287      }
288    }
289    return true;
290  }
291
292  /**
293   * @return Return memstore offheap size or null if <code>r</code> is null
294   */
295  private static long getMemStoreOffHeapSize(HRegion r) {
296    return r == null ? 0 : r.getMemStoreOffHeapSize();
297  }
298
299  /**
300   * @return Return memstore heap size or null if <code>r</code> is null
301   */
302  private static long getMemStoreHeapSize(HRegion r) {
303    return r == null ? 0 : r.getMemStoreHeapSize();
304  }
305
306  /**
307   * @return Return memstore data size or null if <code>r</code> is null
308   */
309  private static long getMemStoreDataSize(HRegion r) {
310    return r == null ? 0 : r.getMemStoreDataSize();
311  }
312
313  private class FlushHandler extends Thread {
314
315    private FlushHandler(String name) {
316      super(name);
317    }
318
319    @Override
320    public void run() {
321      while (!server.isStopped()) {
322        FlushQueueEntry fqe = null;
323        try {
324          wakeupPending.set(false); // allow someone to wake us up again
325          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
326          if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
327            FlushType type = isAboveLowWaterMark();
328            if (type != FlushType.NORMAL) {
329              LOG.debug("Flush thread woke up because memory above low water="
330                + TraditionalBinaryPrefix.long2String(
331                  server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1));
332              // For offheap memstore, even if the lower water mark was breached due to heap
333              // overhead
334              // we still select the regions based on the region's memstore data size.
335              // TODO : If we want to decide based on heap over head it can be done without tracking
336              // it per region.
337              if (!flushOneForGlobalPressure(type)) {
338                // Wasn't able to flush any region, but we're above low water mark
339                // This is unlikely to happen, but might happen when closing the
340                // entire server - another thread is flushing regions. We'll just
341                // sleep a little bit to avoid spinning, and then pretend that
342                // we flushed one, so anyone blocked will check again
343                Thread.sleep(1000);
344                wakeUpIfBlocking();
345              }
346              // Enqueue another one of these tokens so we'll wake up again
347              wakeupFlushThread();
348            }
349            continue;
350          }
351          FlushRegionEntry fre = (FlushRegionEntry) fqe;
352          if (!flushRegion(fre)) {
353            break;
354          }
355        } catch (InterruptedException ex) {
356          continue;
357        } catch (ConcurrentModificationException ex) {
358          continue;
359        } catch (Exception ex) {
360          LOG.error("Cache flusher failed for entry " + fqe, ex);
361          if (!server.checkFileSystem()) {
362            break;
363          }
364        }
365      }
366      synchronized (regionsInQueue) {
367        regionsInQueue.clear();
368        flushQueue.clear();
369      }
370
371      // Signal anyone waiting, so they see the close flag
372      wakeUpIfBlocking();
373      LOG.info(getName() + " exiting");
374    }
375  }
376
377  private void wakeupFlushThread() {
378    if (wakeupPending.compareAndSet(false, true)) {
379      flushQueue.add(WAKEUPFLUSH_INSTANCE);
380    }
381  }
382
383  private HRegion getBiggestMemStoreRegion(SortedMap<Long, Collection<HRegion>> regionsBySize,
384    Set<HRegion> excludedRegions, boolean checkStoreFileCount) {
385    synchronized (regionsInQueue) {
386      for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) {
387        for (HRegion region : entry.getValue()) {
388          if (excludedRegions.contains(region)) {
389            continue;
390          }
391
392          if (region.writestate.flushing || !region.writestate.writesEnabled) {
393            continue;
394          }
395
396          if (checkStoreFileCount && isTooManyStoreFiles(region)) {
397            continue;
398          }
399          return region;
400        }
401      }
402    }
403    return null;
404  }
405
406  private HRegion getBiggestMemStoreOfRegionReplica(
407    SortedMap<Long, Collection<HRegion>> regionsBySize, Set<HRegion> excludedRegions) {
408    synchronized (regionsInQueue) {
409      for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) {
410        for (HRegion region : entry.getValue()) {
411          if (excludedRegions.contains(region)) {
412            continue;
413          }
414
415          if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
416            continue;
417          }
418          return region;
419        }
420      }
421    }
422    return null;
423  }
424
425  private boolean refreshStoreFilesAndReclaimMemory(Region region) {
426    try {
427      return region.refreshStoreFiles();
428    } catch (IOException e) {
429      LOG.warn("Refreshing store files failed with exception", e);
430    }
431    return false;
432  }
433
434  /**
435   * Return the FlushType if global memory usage is above the high watermark
436   */
437  private FlushType isAboveHighWaterMark() {
438    return server.getRegionServerAccounting().isAboveHighWaterMark();
439  }
440
441  /**
442   * Return the FlushType if we're above the low watermark
443   */
444  private FlushType isAboveLowWaterMark() {
445    return server.getRegionServerAccounting().isAboveLowWaterMark();
446  }
447
448  @Override
449  public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) {
450    return this.requestFlush(r, null, tracker);
451  }
452
453  @Override
454  public boolean requestFlush(HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) {
455    synchronized (regionsInQueue) {
456      FlushRegionEntry existFqe = regionsInQueue.get(r);
457      if (existFqe != null) {
458        // if a delayed one exists and not reach the time to execute, just remove it
459        if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) {
460          LOG.info("Remove the existing delayed flush entry for {}, "
461            + "because we need to flush it immediately", r);
462          this.regionsInQueue.remove(r);
463          this.flushQueue.remove(existFqe);
464          r.decrementFlushesQueuedCount();
465        } else {
466          tracker.notExecuted("Flush already requested on " + r);
467          return false;
468        }
469      }
470
471      // This entry has no delay so it will be added at the top of the flush
472      // queue. It'll come out near immediately.
473      FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker);
474      this.regionsInQueue.put(r, fqe);
475      this.flushQueue.add(fqe);
476      r.incrementFlushesQueuedCount();
477      return true;
478    }
479  }
480
481  @Override
482  public boolean requestDelayedFlush(HRegion r, long delay) {
483    synchronized (regionsInQueue) {
484      if (!regionsInQueue.containsKey(r)) {
485        // This entry has some delay
486        FlushRegionEntry fqe = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
487        fqe.requeue(delay);
488        this.regionsInQueue.put(r, fqe);
489        this.flushQueue.add(fqe);
490        r.incrementFlushesQueuedCount();
491        return true;
492      }
493      return false;
494    }
495  }
496
497  public int getFlushQueueSize() {
498    return flushQueue.size();
499  }
500
501  /**
502   * Only interrupt once it's done with a run through the work loop.
503   */
504  void interruptIfNecessary() {
505    lock.writeLock().lock();
506    try {
507      for (FlushHandler flushHander : flushHandlers) {
508        if (flushHander != null) flushHander.interrupt();
509      }
510    } finally {
511      lock.writeLock().unlock();
512    }
513  }
514
515  synchronized void start(UncaughtExceptionHandler eh) {
516    ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder()
517      .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d")
518      .setDaemon(true).setUncaughtExceptionHandler(eh).build();
519    for (int i = 0; i < flushHandlers.length; i++) {
520      flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
521      flusherThreadFactory.newThread(flushHandlers[i]);
522      flushHandlers[i].start();
523    }
524  }
525
526  boolean isAlive() {
527    for (FlushHandler flushHander : flushHandlers) {
528      if (flushHander != null && flushHander.isAlive()) {
529        return true;
530      }
531    }
532    return false;
533  }
534
535  void join() {
536    for (FlushHandler flushHander : flushHandlers) {
537      if (flushHander != null) {
538        Threads.shutdown(flushHander);
539      }
540    }
541  }
542
543  /**
544   * A flushRegion that checks store file count. If too many, puts the flush on delay queue to retry
545   * later. n * @return true if the region was successfully flushed, false otherwise. If false,
546   * there will be accompanying log messages explaining why the region was not flushed.
547   */
548  private boolean flushRegion(final FlushRegionEntry fqe) {
549    HRegion region = fqe.region;
550    if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) {
551      if (fqe.isMaximumWait(this.blockingWaitTime)) {
552        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime)
553          + "ms on a compaction to clean up 'too many store files'; waited "
554          + "long enough... proceeding with flush of "
555          + region.getRegionInfo().getRegionNameAsString());
556      } else {
557        // If this is first time we've been put off, then emit a log message.
558        if (fqe.getRequeueCount() <= 0) {
559          // Note: We don't impose blockingStoreFiles constraint on meta regions
560          LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
561            region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
562            this.blockingWaitTime);
563          final CompactSplit compactSplitThread = server.getCompactSplitThread();
564          if (!compactSplitThread.requestSplit(region)) {
565            try {
566              compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
567            } catch (IOException e) {
568              e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
569              LOG.error("Cache flush failed for region "
570                + Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e);
571            }
572          }
573        }
574
575        // Put back on the queue. Have it come back out of the queue
576        // after a delay of this.blockingWaitTime / 100 ms.
577        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
578        // Tell a lie, it's not flushed but it's ok
579        return true;
580      }
581    }
582    return flushRegion(region, false, fqe.families, fqe.getTracker());
583  }
584
585  /**
586   * Flush a region.
587   * @param region         Region to flush.
588   * @param emergencyFlush Set if we are being force flushed. If true the region needs to be removed
589   *                       from the flush queue. If false, when we were called from the main flusher
590   *                       run loop and we got the entry to flush by calling poll on the flush queue
591   *                       (which removed it).
592   * @param families       stores of region to flush.
593   * @return true if the region was successfully flushed, false otherwise. If false, there will be
594   *         accompanying log messages explaining why the region was not flushed.
595   */
596  private boolean flushRegion(HRegion region, boolean emergencyFlush, List<byte[]> families,
597    FlushLifeCycleTracker tracker) {
598    synchronized (this.regionsInQueue) {
599      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
600      // Use the start time of the FlushRegionEntry if available
601      if (fqe != null && emergencyFlush) {
602        // Need to remove from region from delay queue. When NOT an
603        // emergencyFlush, then item was removed via a flushQueue.poll.
604        flushQueue.remove(fqe);
605      }
606    }
607
608    tracker.beforeExecution();
609    lock.readLock().lock();
610    final CompactSplit compactSplitThread = server.getCompactSplitThread();
611    try {
612      notifyFlushRequest(region, emergencyFlush);
613      FlushResult flushResult = region.flushcache(families, false, tracker);
614      boolean shouldCompact = flushResult.isCompactionNeeded();
615      // We just want to check the size
616      boolean shouldSplit = region.checkSplit().isPresent();
617      if (shouldSplit) {
618        compactSplitThread.requestSplit(region);
619      } else if (shouldCompact) {
620        compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
621      }
622    } catch (DroppedSnapshotException ex) {
623      // Cache flush can fail in a few places. If it fails in a critical
624      // section, we get a DroppedSnapshotException and a replay of wal
625      // is required. Currently the only way to do this is a restart of
626      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
627      // where hdfs was bad but passed the hdfs check).
628      server.abort("Replay of WAL required. Forcing server shutdown", ex);
629      return false;
630    } catch (IOException ex) {
631      ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
632      LOG.error("Cache flush failed" + (region != null
633        ? (" for region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()))
634        : ""), ex);
635      if (!server.checkFileSystem()) {
636        return false;
637      }
638    } finally {
639      lock.readLock().unlock();
640      wakeUpIfBlocking();
641      tracker.afterExecution();
642    }
643    return true;
644  }
645
646  private void notifyFlushRequest(Region region, boolean emergencyFlush) {
647    FlushType type = null;
648    if (emergencyFlush) {
649      type = isAboveHighWaterMark();
650    }
651    if (type == null) {
652      type = isAboveLowWaterMark();
653    }
654    for (FlushRequestListener listener : flushRequestListeners) {
655      listener.flushRequested(type, region);
656    }
657  }
658
659  private void wakeUpIfBlocking() {
660    synchronized (blockSignal) {
661      blockSignal.notifyAll();
662    }
663  }
664
665  private boolean isTooManyStoreFiles(Region region) {
666
667    // When compaction is disabled, the region is flushable
668    if (!region.getTableDescriptor().isCompactionEnabled()) {
669      return false;
670    }
671
672    for (Store store : region.getStores()) {
673      if (store.hasTooManyStoreFiles()) {
674        return true;
675      }
676    }
677    return false;
678  }
679
680  private int getStoreFileCount(Region region) {
681    int count = 0;
682    for (Store store : region.getStores()) {
683      count += store.getStorefilesCount();
684    }
685    return count;
686  }
687
688  /**
689   * Check if the regionserver's memstore memory usage is greater than the limit. If so, flush
690   * regions with the biggest memstores until we're down to the lower limit. This method blocks
691   * callers until we're down to a safe amount of memstore consumption.
692   */
693  public void reclaimMemStoreMemory() {
694    Span span =
695      TraceUtil.getGlobalTracer().spanBuilder("MemStoreFluser.reclaimMemStoreMemory").startSpan();
696    try (Scope scope = span.makeCurrent()) {
697      FlushType flushType = isAboveHighWaterMark();
698      if (flushType != FlushType.NORMAL) {
699        span.addEvent("Force Flush. We're above high water mark.");
700        long start = EnvironmentEdgeManager.currentTime();
701        long nextLogTimeMs = start;
702        synchronized (this.blockSignal) {
703          boolean blocked = false;
704          long startTime = 0;
705          boolean interrupted = false;
706          try {
707            flushType = isAboveHighWaterMark();
708            while (flushType != FlushType.NORMAL && !server.isStopped()) {
709              if (!blocked) {
710                startTime = EnvironmentEdgeManager.currentTime();
711                if (!server.getRegionServerAccounting().isOffheap()) {
712                  logMsg("global memstore heapsize",
713                    server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
714                    server.getRegionServerAccounting().getGlobalMemStoreLimit());
715                } else {
716                  switch (flushType) {
717                    case ABOVE_OFFHEAP_HIGHER_MARK:
718                      logMsg("the global offheap memstore datasize",
719                        server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
720                        server.getRegionServerAccounting().getGlobalMemStoreLimit());
721                      break;
722                    case ABOVE_ONHEAP_HIGHER_MARK:
723                      logMsg("global memstore heapsize",
724                        server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
725                        server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
726                      break;
727                    default:
728                      break;
729                  }
730                }
731              }
732              blocked = true;
733              wakeupFlushThread();
734              try {
735                // we should be able to wait forever, but we've seen a bug where
736                // we miss a notify, so put a 5 second bound on it at least.
737                blockSignal.wait(5 * 1000);
738              } catch (InterruptedException ie) {
739                LOG.warn("Interrupted while waiting");
740                interrupted = true;
741              }
742              long nowMs = EnvironmentEdgeManager.currentTime();
743              if (nowMs >= nextLogTimeMs) {
744                LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
745                nextLogTimeMs = nowMs + 1000;
746              }
747              flushType = isAboveHighWaterMark();
748            }
749          } finally {
750            if (interrupted) {
751              Thread.currentThread().interrupt();
752            }
753          }
754
755          if (blocked) {
756            final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
757            if (totalTime > 0) {
758              this.updatesBlockedMsHighWater.add(totalTime);
759            }
760            LOG.info("Unblocking updates for server " + server.toString());
761          }
762        }
763      } else {
764        flushType = isAboveLowWaterMark();
765        if (flushType != FlushType.NORMAL) {
766          wakeupFlushThread();
767        }
768        span.end();
769      }
770    }
771  }
772
773  private void logMsg(String type, long val, long max) {
774    LOG.info("Blocking updates: {} {} is >= blocking {}", type,
775      TraditionalBinaryPrefix.long2String(val, "", 1),
776      TraditionalBinaryPrefix.long2String(max, "", 1));
777  }
778
779  @Override
780  public String toString() {
781    return "flush_queue=" + flushQueue.size();
782  }
783
784  public String dumpQueue() {
785    StringBuilder queueList = new StringBuilder();
786    queueList.append("Flush Queue Queue dump:\n");
787    queueList.append("  Flush Queue:\n");
788    java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
789
790    while (it.hasNext()) {
791      queueList.append("    " + it.next().toString());
792      queueList.append("\n");
793    }
794
795    return queueList.toString();
796  }
797
798  /**
799   * Register a MemstoreFlushListener n
800   */
801  @Override
802  public void registerFlushRequestListener(final FlushRequestListener listener) {
803    this.flushRequestListeners.add(listener);
804  }
805
806  /**
807   * Unregister the listener from MemstoreFlushListeners n * @return true when passed listener is
808   * unregistered successfully.
809   */
810  @Override
811  public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
812    return this.flushRequestListeners.remove(listener);
813  }
814
815  /**
816   * Sets the global memstore limit to a new size. n
817   */
818  @Override
819  public void setGlobalMemStoreLimit(long globalMemStoreSize) {
820    this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize);
821    reclaimMemStoreMemory();
822  }
823
824  interface FlushQueueEntry extends Delayed {
825  }
826
827  /**
828   * Datastructure used in the flush queue. Holds region and retry count. Keeps tabs on how old this
829   * object is. Implements {@link Delayed}. On construction, the delay is zero. When added to a
830   * delay queue, we'll come out near immediately. Call {@link #requeue(long)} passing delay in
831   * milliseconds before readding to delay queue if you want it to stay there a while.
832   */
833  static class FlushRegionEntry implements FlushQueueEntry {
834    private final HRegion region;
835
836    private final long createTime;
837    private long whenToExpire;
838    private int requeueCount = 0;
839
840    private final List<byte[]> families;
841
842    private final FlushLifeCycleTracker tracker;
843
844    FlushRegionEntry(final HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) {
845      this.region = r;
846      this.createTime = EnvironmentEdgeManager.currentTime();
847      this.whenToExpire = this.createTime;
848      this.families = families;
849      this.tracker = tracker;
850    }
851
852    /**
853     * n * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
854     */
855    public boolean isMaximumWait(final long maximumWait) {
856      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
857    }
858
859    /**
860     * @return True if the entry is a delay flush task
861     */
862    protected boolean isDelay() {
863      return this.whenToExpire > this.createTime;
864    }
865
866    /**
867     * @return Count of times {@link #requeue(long)} was called; i.e this is number of times we've
868     *         been requeued.
869     */
870    public int getRequeueCount() {
871      return this.requeueCount;
872    }
873
874    public FlushLifeCycleTracker getTracker() {
875      return tracker;
876    }
877
878    /**
879     * @param when When to expire, when to come up out of the queue. Specify in milliseconds. This
880     *             method adds EnvironmentEdgeManager.currentTime() to whatever you pass.
881     * @return This.
882     */
883    public FlushRegionEntry requeue(final long when) {
884      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
885      this.requeueCount++;
886      return this;
887    }
888
889    @Override
890    public long getDelay(TimeUnit unit) {
891      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
892        TimeUnit.MILLISECONDS);
893    }
894
895    @Override
896    public int compareTo(Delayed other) {
897      // Delay is compared first. If there is a tie, compare region's hash code
898      int ret =
899        Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS))
900          .intValue();
901      if (ret != 0) {
902        return ret;
903      }
904      FlushQueueEntry otherEntry = (FlushQueueEntry) other;
905      return hashCode() - otherEntry.hashCode();
906    }
907
908    @Override
909    public String toString() {
910      return "[flush region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()) + "]";
911    }
912
913    @Override
914    public int hashCode() {
915      int hash = (int) getDelay(TimeUnit.MILLISECONDS);
916      return hash ^ region.hashCode();
917    }
918
919    @Override
920    public boolean equals(Object obj) {
921      if (this == obj) {
922        return true;
923      }
924      if (obj == null || getClass() != obj.getClass()) {
925        return false;
926      }
927      FlushRegionEntry other = (FlushRegionEntry) obj;
928      if (
929        !Bytes.equals(this.region.getRegionInfo().getRegionName(),
930          other.region.getRegionInfo().getRegionName())
931      ) {
932        return false;
933      }
934      return compareTo(other) == 0;
935    }
936  }
937}