001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.context.Scope;
022import java.io.IOException;
023import java.lang.Thread.UncaughtExceptionHandler;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.ConcurrentModificationException;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.SortedMap;
034import java.util.concurrent.BlockingQueue;
035import java.util.concurrent.DelayQueue;
036import java.util.concurrent.Delayed;
037import java.util.concurrent.ThreadFactory;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.LongAdder;
042import java.util.concurrent.locks.ReentrantReadWriteLock;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.DroppedSnapshotException;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.client.RegionReplicaUtil;
047import org.apache.hadoop.hbase.conf.ConfigurationObserver;
048import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
049import org.apache.hadoop.hbase.trace.TraceUtil;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
053import org.apache.hadoop.hbase.util.Threads;
054import org.apache.hadoop.ipc.RemoteException;
055import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061
062/**
063 * Thread that flushes cache on request NOTE: This class extends Thread rather than Chore because
064 * the sleep time can be interrupted when there is something to do, rather than the Chore sleep time
065 * which is invariant.
066 * @see FlushRequester
067 */
068@InterfaceAudience.Private
069public class MemStoreFlusher implements FlushRequester, ConfigurationObserver {
070  private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
071
072  private Configuration conf;
073  // These two data members go together. Any entry in the one must have
074  // a corresponding entry in the other.
075  private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>();
076  protected final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>();
077  private AtomicBoolean wakeupPending = new AtomicBoolean();
078
079  private final long threadWakeFrequency;
080  private final HRegionServer server;
081  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
082  private final Object blockSignal = new Object();
083
084  private long blockingWaitTime;
085  private final LongAdder updatesBlockedMsHighWater = new LongAdder();
086
087  private FlushHandler[] flushHandlers;
088
089  private final AtomicInteger flusherIdGen = new AtomicInteger();
090
091  private ThreadFactory flusherThreadFactory;
092
093  private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
094
095  /**
096   * Singleton instance inserted into flush queue used for signaling.
097   */
098  private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() {
099    @Override
100    public long getDelay(TimeUnit unit) {
101      return 0;
102    }
103
104    @Override
105    public int compareTo(Delayed o) {
106      return -1;
107    }
108
109    @Override
110    public boolean equals(Object obj) {
111      return obj == this;
112    }
113
114    @Override
115    public int hashCode() {
116      return 42;
117    }
118  };
119
120  /**
121   *   */
122  public MemStoreFlusher(final Configuration conf, final HRegionServer server) {
123    super();
124    this.conf = conf;
125    this.server = server;
126    this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
127    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000);
128    int handlerCount = 0;
129    if (server != null) {
130      handlerCount = getHandlerCount(conf);
131      LOG.info("globalMemStoreLimit="
132        + TraditionalBinaryPrefix
133          .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
134        + ", globalMemStoreLimitLowMark="
135        + TraditionalBinaryPrefix.long2String(
136          this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)
137        + ", Offheap=" + (this.server.getRegionServerAccounting().isOffheap()));
138    }
139    this.flushHandlers = new FlushHandler[handlerCount];
140  }
141
142  public LongAdder getUpdatesBlockedMsHighWater() {
143    return this.updatesBlockedMsHighWater;
144  }
145
146  /**
147   * The memstore across all regions has exceeded the low water mark. Pick one region to flush and
148   * flush it synchronously (this is called from the flush thread)
149   * @return true if successful
150   */
151  private boolean flushOneForGlobalPressure(FlushType flushType) {
152    SortedMap<Long, Collection<HRegion>> regionsBySize = null;
153    switch (flushType) {
154      case ABOVE_OFFHEAP_HIGHER_MARK:
155      case ABOVE_OFFHEAP_LOWER_MARK:
156        regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize();
157        break;
158      case ABOVE_ONHEAP_HIGHER_MARK:
159      case ABOVE_ONHEAP_LOWER_MARK:
160      default:
161        regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize();
162    }
163    Set<HRegion> excludedRegions = new HashSet<>();
164
165    double secondaryMultiplier =
166      ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
167
168    boolean flushedOne = false;
169    while (!flushedOne) {
170      // Find the biggest region that doesn't have too many storefiles (might be null!)
171      HRegion bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
172      // Find the biggest region, total, even if it might have too many flushes.
173      HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false);
174      // Find the biggest region that is a secondary region
175      HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions);
176      if (bestAnyRegion == null) {
177        // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null
178        bestAnyRegion = bestRegionReplica;
179      }
180      if (bestAnyRegion == null) {
181        LOG.error("Above memory mark but there are no flushable regions!");
182        return false;
183      }
184
185      HRegion regionToFlush;
186      long bestAnyRegionSize;
187      long bestFlushableRegionSize;
188      switch (flushType) {
189        case ABOVE_OFFHEAP_HIGHER_MARK:
190        case ABOVE_OFFHEAP_LOWER_MARK:
191          bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
192          bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion);
193          break;
194
195        case ABOVE_ONHEAP_HIGHER_MARK:
196        case ABOVE_ONHEAP_LOWER_MARK:
197          bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
198          bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion);
199          break;
200
201        default:
202          bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
203          bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion);
204      }
205      if (bestAnyRegionSize > 2 * bestFlushableRegionSize) {
206        // Even if it's not supposed to be flushed, pick a region if it's more than twice
207        // as big as the best flushable one - otherwise when we're under pressure we make
208        // lots of little flushes and cause lots of compactions, etc, which just makes
209        // life worse!
210        if (LOG.isDebugEnabled()) {
211          LOG.debug("Under global heap pressure: " + "Region "
212            + bestAnyRegion.getRegionInfo().getRegionNameAsString() + " has too many "
213            + "store files, but is " + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1)
214            + " vs best flushable region's "
215            + TraditionalBinaryPrefix.long2String(bestFlushableRegionSize, "", 1)
216            + ". Choosing the bigger.");
217        }
218        regionToFlush = bestAnyRegion;
219      } else {
220        if (bestFlushableRegion == null) {
221          regionToFlush = bestAnyRegion;
222        } else {
223          regionToFlush = bestFlushableRegion;
224        }
225      }
226
227      long regionToFlushSize;
228      long bestRegionReplicaSize;
229      switch (flushType) {
230        case ABOVE_OFFHEAP_HIGHER_MARK:
231        case ABOVE_OFFHEAP_LOWER_MARK:
232          regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
233          bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica);
234          break;
235
236        case ABOVE_ONHEAP_HIGHER_MARK:
237        case ABOVE_ONHEAP_LOWER_MARK:
238          regionToFlushSize = regionToFlush.getMemStoreHeapSize();
239          bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica);
240          break;
241
242        default:
243          regionToFlushSize = regionToFlush.getMemStoreDataSize();
244          bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica);
245      }
246
247      if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) {
248        // A concurrency issue (such as splitting region) may happen such that the online region
249        // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to
250        // getBiggestMemStoreRegion(). This means that we can come out of the loop
251        LOG.debug("Above memory mark but there is no flushable region");
252        return false;
253      }
254
255      if (
256        regionToFlush == null || (bestRegionReplica != null
257          && ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf)
258          && (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))
259      ) {
260        LOG.info("Refreshing storefiles of region " + bestRegionReplica
261          + " due to global heap pressure. Total memstore off heap size="
262          + TraditionalBinaryPrefix
263            .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1)
264          + " memstore heap size=" + TraditionalBinaryPrefix
265            .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
266        flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
267        if (!flushedOne) {
268          LOG.info("Excluding secondary region " + bestRegionReplica
269            + " - trying to find a different region to refresh files.");
270          excludedRegions.add(bestRegionReplica);
271        }
272      } else {
273        LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
274          + "Flush type=" + flushType.toString() + ", Total Memstore Heap size="
275          + TraditionalBinaryPrefix
276            .long2String(server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)
277          + ", Total Memstore Off-Heap size="
278          + TraditionalBinaryPrefix
279            .long2String(server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1)
280          + ", Region memstore size="
281          + TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
282        flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY);
283
284        if (!flushedOne) {
285          LOG.info("Excluding unflushable region " + regionToFlush
286            + " - trying to find a different region to flush.");
287          excludedRegions.add(regionToFlush);
288        }
289      }
290    }
291    return true;
292  }
293
294  /** Returns Return memstore offheap size or null if <code>r</code> is null */
295  private static long getMemStoreOffHeapSize(HRegion r) {
296    return r == null ? 0 : r.getMemStoreOffHeapSize();
297  }
298
299  /** Returns Return memstore heap size or null if <code>r</code> is null */
300  private static long getMemStoreHeapSize(HRegion r) {
301    return r == null ? 0 : r.getMemStoreHeapSize();
302  }
303
304  /** Returns Return memstore data size or null if <code>r</code> is null */
305  private static long getMemStoreDataSize(HRegion r) {
306    return r == null ? 0 : r.getMemStoreDataSize();
307  }
308
309  private class FlushHandler extends Thread {
310
311    private final AtomicBoolean running = new AtomicBoolean(true);
312
313    private FlushHandler(String name) {
314      super(name);
315    }
316
317    @Override
318    public void run() {
319      while (!server.isStopped() && running.get()) {
320        FlushQueueEntry fqe = null;
321        try {
322          wakeupPending.set(false); // allow someone to wake us up again
323          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
324          if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
325            FlushType type = isAboveLowWaterMark();
326            if (type != FlushType.NORMAL) {
327              LOG.debug("Flush thread woke up because memory above low water="
328                + TraditionalBinaryPrefix.long2String(
329                  server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1));
330              // For offheap memstore, even if the lower water mark was breached due to heap
331              // overhead
332              // we still select the regions based on the region's memstore data size.
333              // TODO : If we want to decide based on heap over head it can be done without tracking
334              // it per region.
335              if (!flushOneForGlobalPressure(type)) {
336                // Wasn't able to flush any region, but we're above low water mark
337                // This is unlikely to happen, but might happen when closing the
338                // entire server - another thread is flushing regions. We'll just
339                // sleep a little bit to avoid spinning, and then pretend that
340                // we flushed one, so anyone blocked will check again
341                Thread.sleep(1000);
342                wakeUpIfBlocking();
343              }
344              // Enqueue another one of these tokens so we'll wake up again
345              wakeupFlushThread();
346            }
347            continue;
348          }
349          FlushRegionEntry fre = (FlushRegionEntry) fqe;
350          if (!flushRegion(fre)) {
351            break;
352          }
353        } catch (InterruptedException ex) {
354          continue;
355        } catch (ConcurrentModificationException ex) {
356          continue;
357        } catch (Exception ex) {
358          LOG.error("Cache flusher failed for entry " + fqe, ex);
359          if (!server.checkFileSystem()) {
360            break;
361          }
362        }
363      }
364
365      if (server.isStopped()) {
366        synchronized (regionsInQueue) {
367          regionsInQueue.clear();
368          flushQueue.clear();
369        }
370
371        // Signal anyone waiting, so they see the close flag
372        wakeUpIfBlocking();
373      }
374      LOG.info(getName() + " exiting");
375    }
376
377    public void shutdown() {
378      if (!running.compareAndSet(true, false)) {
379        LOG.warn("{} is already signaled to shutdown", getName());
380      }
381    }
382  }
383
384  private void wakeupFlushThread() {
385    if (wakeupPending.compareAndSet(false, true)) {
386      flushQueue.add(WAKEUPFLUSH_INSTANCE);
387    }
388  }
389
390  private HRegion getBiggestMemStoreRegion(SortedMap<Long, Collection<HRegion>> regionsBySize,
391    Set<HRegion> excludedRegions, boolean checkStoreFileCount) {
392    synchronized (regionsInQueue) {
393      for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) {
394        for (HRegion region : entry.getValue()) {
395          if (excludedRegions.contains(region)) {
396            continue;
397          }
398
399          if (region.writestate.flushing || !region.writestate.writesEnabled) {
400            continue;
401          }
402
403          if (checkStoreFileCount && isTooManyStoreFiles(region)) {
404            continue;
405          }
406          return region;
407        }
408      }
409    }
410    return null;
411  }
412
413  private HRegion getBiggestMemStoreOfRegionReplica(
414    SortedMap<Long, Collection<HRegion>> regionsBySize, Set<HRegion> excludedRegions) {
415    synchronized (regionsInQueue) {
416      for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) {
417        for (HRegion region : entry.getValue()) {
418          if (excludedRegions.contains(region)) {
419            continue;
420          }
421
422          if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
423            continue;
424          }
425          return region;
426        }
427      }
428    }
429    return null;
430  }
431
432  private boolean refreshStoreFilesAndReclaimMemory(Region region) {
433    try {
434      return region.refreshStoreFiles();
435    } catch (IOException e) {
436      LOG.warn("Refreshing store files failed with exception", e);
437    }
438    return false;
439  }
440
441  /**
442   * Return the FlushType if global memory usage is above the high watermark
443   */
444  private FlushType isAboveHighWaterMark() {
445    return server.getRegionServerAccounting().isAboveHighWaterMark();
446  }
447
448  /**
449   * Return the FlushType if we're above the low watermark
450   */
451  private FlushType isAboveLowWaterMark() {
452    return server.getRegionServerAccounting().isAboveLowWaterMark();
453  }
454
455  @Override
456  public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) {
457    return this.requestFlush(r, null, tracker);
458  }
459
460  @Override
461  public boolean requestFlush(HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) {
462    synchronized (regionsInQueue) {
463      FlushRegionEntry existFqe = regionsInQueue.get(r);
464      if (existFqe != null) {
465        // if a delayed one exists and not reach the time to execute, just remove it
466        if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) {
467          LOG.info("Remove the existing delayed flush entry for {}, "
468            + "because we need to flush it immediately", r);
469          this.regionsInQueue.remove(r);
470          this.flushQueue.remove(existFqe);
471          r.decrementFlushesQueuedCount();
472        } else {
473          tracker.notExecuted("Flush already requested on " + r);
474          return false;
475        }
476      }
477
478      // This entry has no delay so it will be added at the top of the flush
479      // queue. It'll come out near immediately.
480      FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker);
481      this.regionsInQueue.put(r, fqe);
482      this.flushQueue.add(fqe);
483      r.incrementFlushesQueuedCount();
484      return true;
485    }
486  }
487
488  @Override
489  public boolean requestDelayedFlush(HRegion r, long delay) {
490    synchronized (regionsInQueue) {
491      if (!regionsInQueue.containsKey(r)) {
492        // This entry has some delay
493        FlushRegionEntry fqe = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
494        fqe.requeue(delay);
495        this.regionsInQueue.put(r, fqe);
496        this.flushQueue.add(fqe);
497        r.incrementFlushesQueuedCount();
498        return true;
499      }
500      return false;
501    }
502  }
503
504  public int getFlushQueueSize() {
505    return flushQueue.size();
506  }
507
508  /**
509   * Only interrupt once it's done with a run through the work loop.
510   */
511  void interruptIfNecessary() {
512    lock.writeLock().lock();
513    try {
514      for (FlushHandler flushHandler : flushHandlers) {
515        if (flushHandler != null) {
516          flushHandler.interrupt();
517        }
518      }
519    } finally {
520      lock.writeLock().unlock();
521    }
522  }
523
524  synchronized void start(UncaughtExceptionHandler eh) {
525    this.flusherThreadFactory =
526      new ThreadFactoryBuilder().setDaemon(true).setUncaughtExceptionHandler(eh).build();
527    lock.readLock().lock();
528    try {
529      startFlushHandlerThreads(flushHandlers, 0, flushHandlers.length);
530    } finally {
531      lock.readLock().unlock();
532    }
533  }
534
535  boolean isAlive() {
536    lock.readLock().lock();
537    try {
538      for (FlushHandler flushHandler : flushHandlers) {
539        if (flushHandler != null && flushHandler.isAlive()) {
540          return true;
541        }
542      }
543      return false;
544    } finally {
545      lock.readLock().unlock();
546    }
547  }
548
549  void shutdown() {
550    lock.readLock().lock();
551    try {
552      for (FlushHandler flushHandler : flushHandlers) {
553        if (flushHandler != null) {
554          Threads.shutdown(flushHandler);
555        }
556      }
557    } finally {
558      lock.readLock().unlock();
559    }
560  }
561
562  /**
563   * A flushRegion that checks store file count. If too many, puts the flush on delay queue to retry
564   * later.
565   * @return true if the region was successfully flushed, false otherwise. If false, there will be
566   *         accompanying log messages explaining why the region was not flushed.
567   */
568  private boolean flushRegion(final FlushRegionEntry fqe) {
569    HRegion region = fqe.region;
570    if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) {
571      if (fqe.isMaximumWait(this.blockingWaitTime)) {
572        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime)
573          + "ms on a compaction to clean up 'too many store files'; waited "
574          + "long enough... proceeding with flush of "
575          + region.getRegionInfo().getRegionNameAsString());
576      } else {
577        // If this is first time we've been put off, then emit a log message.
578        if (fqe.getRequeueCount() <= 0) {
579          // Note: We don't impose blockingStoreFiles constraint on meta regions
580          LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
581            region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
582            this.blockingWaitTime);
583          final CompactSplit compactSplitThread = server.getCompactSplitThread();
584          if (!compactSplitThread.requestSplit(region)) {
585            try {
586              compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
587            } catch (IOException e) {
588              e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
589              LOG.error("Cache flush failed for region "
590                + Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e);
591            }
592          }
593        }
594
595        // Put back on the queue. Have it come back out of the queue
596        // after a delay of this.blockingWaitTime / 100 ms.
597        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
598        // Tell a lie, it's not flushed but it's ok
599        return true;
600      }
601    }
602    return flushRegion(region, false, fqe.families, fqe.getTracker());
603  }
604
605  /**
606   * Flush a region.
607   * @param region         Region to flush.
608   * @param emergencyFlush Set if we are being force flushed. If true the region needs to be removed
609   *                       from the flush queue. If false, when we were called from the main flusher
610   *                       run loop and we got the entry to flush by calling poll on the flush queue
611   *                       (which removed it).
612   * @param families       stores of region to flush.
613   * @return true if the region was successfully flushed, false otherwise. If false, there will be
614   *         accompanying log messages explaining why the region was not flushed.
615   */
616  private boolean flushRegion(HRegion region, boolean emergencyFlush, List<byte[]> families,
617    FlushLifeCycleTracker tracker) {
618    synchronized (this.regionsInQueue) {
619      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
620      // Use the start time of the FlushRegionEntry if available
621      if (fqe != null && emergencyFlush) {
622        // Need to remove from region from delay queue. When NOT an
623        // emergencyFlush, then item was removed via a flushQueue.poll.
624        flushQueue.remove(fqe);
625      }
626    }
627
628    tracker.beforeExecution();
629    lock.readLock().lock();
630    final CompactSplit compactSplitThread = server.getCompactSplitThread();
631    try {
632      notifyFlushRequest(region, emergencyFlush);
633      FlushResult flushResult = region.flushcache(families, false, tracker);
634      boolean shouldCompact = flushResult.isCompactionNeeded();
635      // We just want to check the size
636      boolean shouldSplit = region.checkSplit().isPresent();
637      if (shouldSplit) {
638        compactSplitThread.requestSplit(region);
639      } else if (shouldCompact) {
640        compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
641      }
642    } catch (DroppedSnapshotException ex) {
643      // Cache flush can fail in a few places. If it fails in a critical
644      // section, we get a DroppedSnapshotException and a replay of wal
645      // is required. Currently the only way to do this is a restart of
646      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
647      // where hdfs was bad but passed the hdfs check).
648      server.abort("Replay of WAL required. Forcing server shutdown", ex);
649      return false;
650    } catch (IOException ex) {
651      ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
652      LOG.error("Cache flush failed" + (region != null
653        ? (" for region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()))
654        : ""), ex);
655      if (!server.checkFileSystem()) {
656        return false;
657      }
658    } finally {
659      lock.readLock().unlock();
660      wakeUpIfBlocking();
661      tracker.afterExecution();
662    }
663    return true;
664  }
665
666  private void notifyFlushRequest(Region region, boolean emergencyFlush) {
667    FlushType type = null;
668    if (emergencyFlush) {
669      type = isAboveHighWaterMark();
670    }
671    if (type == null) {
672      type = isAboveLowWaterMark();
673    }
674    for (FlushRequestListener listener : flushRequestListeners) {
675      listener.flushRequested(type, region);
676    }
677  }
678
679  private void wakeUpIfBlocking() {
680    synchronized (blockSignal) {
681      blockSignal.notifyAll();
682    }
683  }
684
685  private boolean isTooManyStoreFiles(Region region) {
686
687    // When compaction is disabled, the region is flushable
688    if (!region.getTableDescriptor().isCompactionEnabled()) {
689      return false;
690    }
691
692    for (Store store : region.getStores()) {
693      if (store.hasTooManyStoreFiles()) {
694        return true;
695      }
696    }
697    return false;
698  }
699
700  private int getStoreFileCount(Region region) {
701    int count = 0;
702    for (Store store : region.getStores()) {
703      count += store.getStorefilesCount();
704    }
705    return count;
706  }
707
708  /**
709   * Check if the regionserver's memstore memory usage is greater than the limit. If so, flush
710   * regions with the biggest memstores until we're down to the lower limit. This method blocks
711   * callers until we're down to a safe amount of memstore consumption.
712   */
713  public void reclaimMemStoreMemory() {
714    Span span =
715      TraceUtil.getGlobalTracer().spanBuilder("MemStoreFluser.reclaimMemStoreMemory").startSpan();
716    try (Scope scope = span.makeCurrent()) {
717      FlushType flushType = isAboveHighWaterMark();
718      if (flushType != FlushType.NORMAL) {
719        span.addEvent("Force Flush. We're above high water mark.");
720        long start = EnvironmentEdgeManager.currentTime();
721        long nextLogTimeMs = start;
722        synchronized (this.blockSignal) {
723          boolean blocked = false;
724          long startTime = 0;
725          boolean interrupted = false;
726          try {
727            flushType = isAboveHighWaterMark();
728            while (flushType != FlushType.NORMAL && !server.isStopped()) {
729              if (!blocked) {
730                startTime = EnvironmentEdgeManager.currentTime();
731                if (!server.getRegionServerAccounting().isOffheap()) {
732                  logMsg("global memstore heapsize",
733                    server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
734                    server.getRegionServerAccounting().getGlobalMemStoreLimit());
735                } else {
736                  switch (flushType) {
737                    case ABOVE_OFFHEAP_HIGHER_MARK:
738                      logMsg("the global offheap memstore datasize",
739                        server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
740                        server.getRegionServerAccounting().getGlobalMemStoreLimit());
741                      break;
742                    case ABOVE_ONHEAP_HIGHER_MARK:
743                      logMsg("global memstore heapsize",
744                        server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
745                        server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
746                      break;
747                    default:
748                      break;
749                  }
750                }
751              }
752              blocked = true;
753              wakeupFlushThread();
754              try {
755                // we should be able to wait forever, but we've seen a bug where
756                // we miss a notify, so put a 5 second bound on it at least.
757                blockSignal.wait(5 * 1000);
758              } catch (InterruptedException ie) {
759                LOG.warn("Interrupted while waiting");
760                interrupted = true;
761              }
762              long nowMs = EnvironmentEdgeManager.currentTime();
763              if (nowMs >= nextLogTimeMs) {
764                LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
765                nextLogTimeMs = nowMs + 1000;
766              }
767              flushType = isAboveHighWaterMark();
768            }
769          } finally {
770            if (interrupted) {
771              Thread.currentThread().interrupt();
772            }
773          }
774
775          if (blocked) {
776            final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
777            if (totalTime > 0) {
778              this.updatesBlockedMsHighWater.add(totalTime);
779            }
780            LOG.info("Unblocking updates for server " + server.toString());
781          }
782        }
783      } else {
784        flushType = isAboveLowWaterMark();
785        if (flushType != FlushType.NORMAL) {
786          wakeupFlushThread();
787        }
788        span.end();
789      }
790    }
791  }
792
793  private void logMsg(String type, long val, long max) {
794    LOG.info("Blocking updates: {} {} is >= blocking {}", type,
795      TraditionalBinaryPrefix.long2String(val, "", 1),
796      TraditionalBinaryPrefix.long2String(max, "", 1));
797  }
798
799  @Override
800  public String toString() {
801    return "flush_queue=" + flushQueue.size();
802  }
803
804  public String dumpQueue() {
805    StringBuilder queueList = new StringBuilder();
806    queueList.append("Flush Queue Queue dump:\n");
807    queueList.append("  Flush Queue:\n");
808    java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
809
810    while (it.hasNext()) {
811      queueList.append("    " + it.next().toString());
812      queueList.append("\n");
813    }
814
815    return queueList.toString();
816  }
817
818  /**
819   * Register a MemstoreFlushListener
820   */
821  @Override
822  public void registerFlushRequestListener(final FlushRequestListener listener) {
823    this.flushRequestListeners.add(listener);
824  }
825
826  /**
827   * Unregister the listener from MemstoreFlushListeners
828   * @return true when passed listener is unregistered successfully.
829   */
830  @Override
831  public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
832    return this.flushRequestListeners.remove(listener);
833  }
834
835  /**
836   * Sets the global memstore limit to a new size.
837   */
838  @Override
839  public void setGlobalMemStoreLimit(long globalMemStoreSize) {
840    this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize);
841    reclaimMemStoreMemory();
842  }
843
844  interface FlushQueueEntry extends Delayed {
845  }
846
847  /**
848   * Datastructure used in the flush queue. Holds region and retry count. Keeps tabs on how old this
849   * object is. Implements {@link Delayed}. On construction, the delay is zero. When added to a
850   * delay queue, we'll come out near immediately. Call {@link #requeue(long)} passing delay in
851   * milliseconds before readding to delay queue if you want it to stay there a while.
852   */
853  static class FlushRegionEntry implements FlushQueueEntry {
854    private final HRegion region;
855
856    private final long createTime;
857    private long whenToExpire;
858    private int requeueCount = 0;
859
860    private final List<byte[]> families;
861
862    private final FlushLifeCycleTracker tracker;
863
864    FlushRegionEntry(final HRegion r, List<byte[]> families, FlushLifeCycleTracker tracker) {
865      this.region = r;
866      this.createTime = EnvironmentEdgeManager.currentTime();
867      this.whenToExpire = this.createTime;
868      this.families = families;
869      this.tracker = tracker;
870    }
871
872    /** Returns True if we have been delayed > <code>maximumWait</code> milliseconds. */
873    public boolean isMaximumWait(final long maximumWait) {
874      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
875    }
876
877    /** Returns True if the entry is a delay flush task */
878    protected boolean isDelay() {
879      return this.whenToExpire > this.createTime;
880    }
881
882    /**
883     * @return Count of times {@link #requeue(long)} was called; i.e this is number of times we've
884     *         been requeued.
885     */
886    public int getRequeueCount() {
887      return this.requeueCount;
888    }
889
890    public FlushLifeCycleTracker getTracker() {
891      return tracker;
892    }
893
894    /**
895     * @param when When to expire, when to come up out of the queue. Specify in milliseconds. This
896     *             method adds EnvironmentEdgeManager.currentTime() to whatever you pass.
897     * @return This.
898     */
899    public FlushRegionEntry requeue(final long when) {
900      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
901      this.requeueCount++;
902      return this;
903    }
904
905    @Override
906    public long getDelay(TimeUnit unit) {
907      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
908        TimeUnit.MILLISECONDS);
909    }
910
911    @Override
912    public int compareTo(Delayed other) {
913      // Delay is compared first. If there is a tie, compare region's hash code
914      int ret =
915        Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS))
916          .intValue();
917      if (ret != 0) {
918        return ret;
919      }
920      FlushQueueEntry otherEntry = (FlushQueueEntry) other;
921      return hashCode() - otherEntry.hashCode();
922    }
923
924    @Override
925    public String toString() {
926      return "[flush region " + Bytes.toStringBinary(region.getRegionInfo().getRegionName()) + "]";
927    }
928
929    @Override
930    public int hashCode() {
931      int hash = (int) getDelay(TimeUnit.MILLISECONDS);
932      return hash ^ region.hashCode();
933    }
934
935    @Override
936    public boolean equals(Object obj) {
937      if (this == obj) {
938        return true;
939      }
940      if (obj == null || getClass() != obj.getClass()) {
941        return false;
942      }
943      FlushRegionEntry other = (FlushRegionEntry) obj;
944      if (
945        !Bytes.equals(this.region.getRegionInfo().getRegionName(),
946          other.region.getRegionInfo().getRegionName())
947      ) {
948        return false;
949      }
950      return compareTo(other) == 0;
951    }
952  }
953
954  private int getHandlerCount(Configuration conf) {
955    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
956    if (handlerCount < 1) {
957      LOG.warn(
958        "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
959        handlerCount);
960      handlerCount = 1;
961    }
962    return handlerCount;
963  }
964
965  @Override
966  public void onConfigurationChange(Configuration newConf) {
967    int newHandlerCount = getHandlerCount(newConf);
968    if (newHandlerCount != flushHandlers.length) {
969      LOG.info("update hbase.hstore.flusher.count from {} to {}", flushHandlers.length,
970        newHandlerCount);
971      lock.writeLock().lock();
972      try {
973        FlushHandler[] newFlushHandlers = Arrays.copyOf(flushHandlers, newHandlerCount);
974        if (newHandlerCount > flushHandlers.length) {
975          startFlushHandlerThreads(newFlushHandlers, flushHandlers.length, newFlushHandlers.length);
976        } else {
977          stopFlushHandlerThreads(flushHandlers, newHandlerCount, flushHandlers.length);
978        }
979        flusherIdGen.compareAndSet(flushHandlers.length, newFlushHandlers.length);
980        this.flushHandlers = newFlushHandlers;
981      } finally {
982        lock.writeLock().unlock();
983      }
984    }
985  }
986
987  private void startFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
988    if (flusherThreadFactory != null) {
989      for (int i = start; i < end; i++) {
990        flushHandlers[i] = new FlushHandler("MemStoreFlusher." + flusherIdGen.getAndIncrement());
991        flusherThreadFactory.newThread(flushHandlers[i]);
992        flushHandlers[i].start();
993      }
994    }
995  }
996
997  private void stopFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
998    for (int i = start; i < end; i++) {
999      flushHandlers[i].shutdown();
1000      if (LOG.isDebugEnabled()) {
1001        LOG.debug("send shutdown signal to {}", flushHandlers[i].getName());
1002      }
1003    }
1004  }
1005
1006  public int getFlusherCount() {
1007    return flusherIdGen.get();
1008  }
1009}