001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.Thread.UncaughtExceptionHandler;
023import java.util.ArrayList;
024import java.util.ConcurrentModificationException;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.SortedMap;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.DelayQueue;
033import java.util.concurrent.Delayed;
034import java.util.concurrent.ThreadFactory;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.LongAdder;
038import java.util.concurrent.locks.ReentrantReadWriteLock;
039
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.DroppedSnapshotException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.client.RegionReplicaUtil;
044import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
045import org.apache.hadoop.hbase.trace.TraceUtil;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
049import org.apache.hadoop.hbase.util.Threads;
050import org.apache.hadoop.ipc.RemoteException;
051import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
053import org.apache.htrace.core.TraceScope;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * Thread that flushes cache on request
060 *
061 * NOTE: This class extends Thread rather than Chore because the sleep time
062 * can be interrupted when there is something to do, rather than the Chore
063 * sleep time which is invariant.
064 *
065 * @see FlushRequester
066 */
067@InterfaceAudience.Private
068class MemStoreFlusher implements FlushRequester {
069  private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
070
071  private Configuration conf;
072  // These two data members go together.  Any entry in the one must have
073  // a corresponding entry in the other.
074  private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>();
075  private final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>();
076  private AtomicBoolean wakeupPending = new AtomicBoolean();
077
078  private final long threadWakeFrequency;
079  private final HRegionServer server;
080  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
081  private final Object blockSignal = new Object();
082
083  private long blockingWaitTime;
084  private final LongAdder updatesBlockedMsHighWater = new LongAdder();
085
086  private final FlushHandler[] flushHandlers;
087  private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
088
089  /**
090   * Singleton instance inserted into flush queue used for signaling.
091   */
092  private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() {
093    @Override
094    public long getDelay(TimeUnit unit) {
095      return 0;
096    }
097
098    @Override
099    public int compareTo(Delayed o) {
100      return -1;
101    }
102
103    @Override
104    public boolean equals(Object obj) {
105      return obj == this;
106    }
107
108    @Override
109    public int hashCode() {
110      return 42;
111    }
112  };
113
114
115  /**
116   * @param conf
117   * @param server
118   */
119  public MemStoreFlusher(final Configuration conf,
120      final HRegionServer server) {
121    super();
122    this.conf = conf;
123    this.server = server;
124    this.threadWakeFrequency =
125        conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
126    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
127      90000);
128    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
129    if (handlerCount < 1) {
130      LOG.warn("hbase.hstore.flusher.count was configed to {} which is less than 1, corrected to 1",
131          handlerCount);
132      handlerCount = 1;
133    }
134    this.flushHandlers = new FlushHandler[handlerCount];
135    LOG.info("globalMemStoreLimit="
136        + TraditionalBinaryPrefix
137            .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
138        + ", globalMemStoreLimitLowMark="
139        + TraditionalBinaryPrefix.long2String(
140          this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)
141        + ", Offheap="
142        + (this.server.getRegionServerAccounting().isOffheap()));
143  }
144
145  public LongAdder getUpdatesBlockedMsHighWater() {
146    return this.updatesBlockedMsHighWater;
147  }
148
149  /**
150   * The memstore across all regions has exceeded the low water mark. Pick
151   * one region to flush and flush it synchronously (this is called from the
152   * flush thread)
153   * @return true if successful
154   */
155  private boolean flushOneForGlobalPressure(FlushType flushType) {
156    SortedMap<Long, HRegion> regionsBySize = null;
157    switch(flushType) {
158      case ABOVE_OFFHEAP_HIGHER_MARK:
159      case ABOVE_OFFHEAP_LOWER_MARK:
160        regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize();
161        break;
162      case ABOVE_ONHEAP_HIGHER_MARK:
163      case ABOVE_ONHEAP_LOWER_MARK:
164      default:
165        regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize();
166    }
167    Set<HRegion> excludedRegions = new HashSet<>();
168
169    double secondaryMultiplier
170      = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
171
172    boolean flushedOne = false;
173    while (!flushedOne) {
174      // Find the biggest region that doesn't have too many storefiles (might be null!)
175      HRegion bestFlushableRegion =
176          getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
177      // Find the biggest region, total, even if it might have too many flushes.
178      HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false);
179      // Find the biggest region that is a secondary region
180      HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions);
181      if (bestAnyRegion == null) {
182        // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null
183        bestAnyRegion = bestRegionReplica;
184      }
185      if (bestAnyRegion == null) {
186        LOG.error("Above memory mark but there are no flushable regions!");
187        return false;
188      }
189
190      HRegion regionToFlush;
191      long bestAnyRegionSize;
192      long bestFlushableRegionSize;
193      switch(flushType) {
194        case ABOVE_OFFHEAP_HIGHER_MARK:
195        case ABOVE_OFFHEAP_LOWER_MARK:
196          bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
197          bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion);
198          break;
199
200        case ABOVE_ONHEAP_HIGHER_MARK:
201        case ABOVE_ONHEAP_LOWER_MARK:
202          bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
203          bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion);
204          break;
205
206        default:
207          bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
208          bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion);
209      }
210      if (bestAnyRegionSize > 2 * bestFlushableRegionSize) {
211        // Even if it's not supposed to be flushed, pick a region if it's more than twice
212        // as big as the best flushable one - otherwise when we're under pressure we make
213        // lots of little flushes and cause lots of compactions, etc, which just makes
214        // life worse!
215        if (LOG.isDebugEnabled()) {
216          LOG.debug("Under global heap pressure: " + "Region "
217              + bestAnyRegion.getRegionInfo().getRegionNameAsString()
218              + " has too many " + "store files, but is "
219              + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1)
220              + " vs best flushable region's "
221              + TraditionalBinaryPrefix.long2String(
222              bestFlushableRegionSize, "", 1)
223              + ". Choosing the bigger.");
224        }
225        regionToFlush = bestAnyRegion;
226      } else {
227        if (bestFlushableRegion == null) {
228          regionToFlush = bestAnyRegion;
229        } else {
230          regionToFlush = bestFlushableRegion;
231        }
232      }
233
234      long regionToFlushSize;
235      long bestRegionReplicaSize;
236      switch(flushType) {
237        case ABOVE_OFFHEAP_HIGHER_MARK:
238        case ABOVE_OFFHEAP_LOWER_MARK:
239          regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
240          bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica);
241          break;
242
243        case ABOVE_ONHEAP_HIGHER_MARK:
244        case ABOVE_ONHEAP_LOWER_MARK:
245          regionToFlushSize = regionToFlush.getMemStoreHeapSize();
246          bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica);
247          break;
248
249        default:
250          regionToFlushSize = regionToFlush.getMemStoreDataSize();
251          bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica);
252      }
253
254      if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) {
255        // A concurrency issue (such as splitting region) may happen such that the online region
256        // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to
257        // getBiggestMemStoreRegion(). This means that we can come out of the loop
258        LOG.debug("Above memory mark but there is no flushable region");
259        return false;
260      }
261
262      if (regionToFlush == null ||
263          (bestRegionReplica != null &&
264           ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
265           (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))) {
266        LOG.info("Refreshing storefiles of region " + bestRegionReplica +
267            " due to global heap pressure. Total memstore off heap size=" +
268            TraditionalBinaryPrefix.long2String(
269              server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
270            " memstore heap size=" + TraditionalBinaryPrefix.long2String(
271              server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
272        flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
273        if (!flushedOne) {
274          LOG.info("Excluding secondary region " + bestRegionReplica +
275              " - trying to find a different region to refresh files.");
276          excludedRegions.add(bestRegionReplica);
277        }
278      } else {
279        LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " +
280            "Flush type=" + flushType.toString() +
281            ", Total Memstore Heap size=" +
282            TraditionalBinaryPrefix.long2String(
283                server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) +
284            ", Total Memstore Off-Heap size=" +
285            TraditionalBinaryPrefix.long2String(
286                server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
287            ", Region memstore size=" +
288            TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
289        flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY);
290
291        if (!flushedOne) {
292          LOG.info("Excluding unflushable region " + regionToFlush +
293              " - trying to find a different region to flush.");
294          excludedRegions.add(regionToFlush);
295        }
296      }
297    }
298    return true;
299  }
300
301  /**
302   * @return Return memstore offheap size or null if <code>r</code> is null
303   */
304  private static long getMemStoreOffHeapSize(HRegion r) {
305    return r == null? 0: r.getMemStoreOffHeapSize();
306  }
307
308  /**
309   * @return Return memstore heap size or null if <code>r</code> is null
310   */
311  private static long getMemStoreHeapSize(HRegion r) {
312    return r == null? 0: r.getMemStoreHeapSize();
313  }
314
315  /**
316   * @return Return memstore data size or null if <code>r</code> is null
317   */
318  private static long getMemStoreDataSize(HRegion r) {
319    return r == null? 0: r.getMemStoreDataSize();
320  }
321
322  private class FlushHandler extends Thread {
323
324    private FlushHandler(String name) {
325      super(name);
326    }
327
328    @Override
329    public void run() {
330      while (!server.isStopped()) {
331        FlushQueueEntry fqe = null;
332        try {
333          wakeupPending.set(false); // allow someone to wake us up again
334          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
335          if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
336            FlushType type = isAboveLowWaterMark();
337            if (type != FlushType.NORMAL) {
338              LOG.debug("Flush thread woke up because memory above low water="
339                  + TraditionalBinaryPrefix.long2String(
340                    server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1));
341              // For offheap memstore, even if the lower water mark was breached due to heap overhead
342              // we still select the regions based on the region's memstore data size.
343              // TODO : If we want to decide based on heap over head it can be done without tracking
344              // it per region.
345              if (!flushOneForGlobalPressure(type)) {
346                // Wasn't able to flush any region, but we're above low water mark
347                // This is unlikely to happen, but might happen when closing the
348                // entire server - another thread is flushing regions. We'll just
349                // sleep a little bit to avoid spinning, and then pretend that
350                // we flushed one, so anyone blocked will check again
351                Thread.sleep(1000);
352                wakeUpIfBlocking();
353              }
354              // Enqueue another one of these tokens so we'll wake up again
355              wakeupFlushThread();
356            }
357            continue;
358          }
359          FlushRegionEntry fre = (FlushRegionEntry) fqe;
360          if (!flushRegion(fre)) {
361            break;
362          }
363        } catch (InterruptedException ex) {
364          continue;
365        } catch (ConcurrentModificationException ex) {
366          continue;
367        } catch (Exception ex) {
368          LOG.error("Cache flusher failed for entry " + fqe, ex);
369          if (!server.checkFileSystem()) {
370            break;
371          }
372        }
373      }
374      synchronized (regionsInQueue) {
375        regionsInQueue.clear();
376        flushQueue.clear();
377      }
378
379      // Signal anyone waiting, so they see the close flag
380      wakeUpIfBlocking();
381      LOG.info(getName() + " exiting");
382    }
383  }
384
385
386  private void wakeupFlushThread() {
387    if (wakeupPending.compareAndSet(false, true)) {
388      flushQueue.add(WAKEUPFLUSH_INSTANCE);
389    }
390  }
391
392  private HRegion getBiggestMemStoreRegion(
393      SortedMap<Long, HRegion> regionsBySize,
394      Set<HRegion> excludedRegions,
395      boolean checkStoreFileCount) {
396    synchronized (regionsInQueue) {
397      for (HRegion region : regionsBySize.values()) {
398        if (excludedRegions.contains(region)) {
399          continue;
400        }
401
402        if (region.writestate.flushing || !region.writestate.writesEnabled) {
403          continue;
404        }
405
406        if (checkStoreFileCount && isTooManyStoreFiles(region)) {
407          continue;
408        }
409        return region;
410      }
411    }
412    return null;
413  }
414
415  private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize,
416      Set<HRegion> excludedRegions) {
417    synchronized (regionsInQueue) {
418      for (HRegion region : regionsBySize.values()) {
419        if (excludedRegions.contains(region)) {
420          continue;
421        }
422
423        if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
424          continue;
425        }
426
427        return region;
428      }
429    }
430    return null;
431  }
432
433  private boolean refreshStoreFilesAndReclaimMemory(Region region) {
434    try {
435      return region.refreshStoreFiles();
436    } catch (IOException e) {
437      LOG.warn("Refreshing store files failed with exception", e);
438    }
439    return false;
440  }
441
442  /**
443   * Return true if global memory usage is above the high watermark
444   */
445  private FlushType isAboveHighWaterMark() {
446    return server.getRegionServerAccounting().isAboveHighWaterMark();
447  }
448
449  /**
450   * Return true if we're above the low watermark
451   */
452  private FlushType isAboveLowWaterMark() {
453    return server.getRegionServerAccounting().isAboveLowWaterMark();
454  }
455
456  @Override
457  public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) {
458    return this.requestFlush(r, null, tracker);
459  }
460
461  @Override
462  public boolean requestFlush(HRegion r, List<byte[]> families,
463      FlushLifeCycleTracker tracker) {
464    synchronized (regionsInQueue) {
465      if (!regionsInQueue.containsKey(r)) {
466        // This entry has no delay so it will be added at the top of the flush
467        // queue. It'll come out near immediately.
468        FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker);
469        this.regionsInQueue.put(r, fqe);
470        this.flushQueue.add(fqe);
471        r.incrementFlushesQueuedCount();
472        return true;
473      } else {
474        tracker.notExecuted("Flush already requested on " + r);
475        return false;
476      }
477    }
478  }
479
480  @Override
481  public boolean requestDelayedFlush(HRegion r, long delay) {
482    synchronized (regionsInQueue) {
483      if (!regionsInQueue.containsKey(r)) {
484        // This entry has some delay
485        FlushRegionEntry fqe =
486            new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
487        fqe.requeue(delay);
488        this.regionsInQueue.put(r, fqe);
489        this.flushQueue.add(fqe);
490        r.incrementFlushesQueuedCount();
491        return true;
492      }
493      return false;
494    }
495  }
496
497  public int getFlushQueueSize() {
498    return flushQueue.size();
499  }
500
501  /**
502   * Only interrupt once it's done with a run through the work loop.
503   */
504  void interruptIfNecessary() {
505    lock.writeLock().lock();
506    try {
507      for (FlushHandler flushHander : flushHandlers) {
508        if (flushHander != null) flushHander.interrupt();
509      }
510    } finally {
511      lock.writeLock().unlock();
512    }
513  }
514
515  synchronized void start(UncaughtExceptionHandler eh) {
516    ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder()
517      .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d")
518      .setDaemon(true).setUncaughtExceptionHandler(eh).build();
519    for (int i = 0; i < flushHandlers.length; i++) {
520      flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
521      flusherThreadFactory.newThread(flushHandlers[i]);
522      flushHandlers[i].start();
523    }
524  }
525
526  boolean isAlive() {
527    for (FlushHandler flushHander : flushHandlers) {
528      if (flushHander != null && flushHander.isAlive()) {
529        return true;
530      }
531    }
532    return false;
533  }
534
535  void join() {
536    for (FlushHandler flushHander : flushHandlers) {
537      if (flushHander != null) {
538        Threads.shutdown(flushHander);
539      }
540    }
541  }
542
543  /**
544   * A flushRegion that checks store file count.  If too many, puts the flush
545   * on delay queue to retry later.
546   * @param fqe
547   * @return true if the region was successfully flushed, false otherwise. If
548   * false, there will be accompanying log messages explaining why the region was
549   * not flushed.
550   */
551  private boolean flushRegion(final FlushRegionEntry fqe) {
552    HRegion region = fqe.region;
553    if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) {
554      if (fqe.isMaximumWait(this.blockingWaitTime)) {
555        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
556          "ms on a compaction to clean up 'too many store files'; waited " +
557          "long enough... proceeding with flush of " +
558          region.getRegionInfo().getRegionNameAsString());
559      } else {
560        // If this is first time we've been put off, then emit a log message.
561        if (fqe.getRequeueCount() <= 0) {
562          // Note: We don't impose blockingStoreFiles constraint on meta regions
563          LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
564              region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
565              this.blockingWaitTime);
566          if (!this.server.compactSplitThread.requestSplit(region)) {
567            try {
568              this.server.compactSplitThread.requestSystemCompaction(region,
569                Thread.currentThread().getName());
570            } catch (IOException e) {
571              e = e instanceof RemoteException ?
572                      ((RemoteException)e).unwrapRemoteException() : e;
573              LOG.error("Cache flush failed for region " +
574                Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e);
575            }
576          }
577        }
578
579        // Put back on the queue.  Have it come back out of the queue
580        // after a delay of this.blockingWaitTime / 100 ms.
581        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
582        // Tell a lie, it's not flushed but it's ok
583        return true;
584      }
585    }
586    return flushRegion(region, false, fqe.families, fqe.getTracker());
587  }
588
589  /**
590   * Flush a region.
591   * @param region Region to flush.
592   * @param emergencyFlush Set if we are being force flushed. If true the region
593   * needs to be removed from the flush queue. If false, when we were called
594   * from the main flusher run loop and we got the entry to flush by calling
595   * poll on the flush queue (which removed it).
596   * @param families stores of region to flush.
597   * @return true if the region was successfully flushed, false otherwise. If
598   * false, there will be accompanying log messages explaining why the region was
599   * not flushed.
600   */
601  private boolean flushRegion(HRegion region, boolean emergencyFlush,
602      List<byte[]> families, FlushLifeCycleTracker tracker) {
603    synchronized (this.regionsInQueue) {
604      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
605      // Use the start time of the FlushRegionEntry if available
606      if (fqe != null && emergencyFlush) {
607        // Need to remove from region from delay queue. When NOT an
608        // emergencyFlush, then item was removed via a flushQueue.poll.
609        flushQueue.remove(fqe);
610      }
611    }
612
613    tracker.beforeExecution();
614    lock.readLock().lock();
615    try {
616      notifyFlushRequest(region, emergencyFlush);
617      FlushResult flushResult = region.flushcache(families, false, tracker);
618      boolean shouldCompact = flushResult.isCompactionNeeded();
619      // We just want to check the size
620      boolean shouldSplit = region.checkSplit().isPresent();
621      if (shouldSplit) {
622        this.server.compactSplitThread.requestSplit(region);
623      } else if (shouldCompact) {
624        server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
625      }
626    } catch (DroppedSnapshotException ex) {
627      // Cache flush can fail in a few places. If it fails in a critical
628      // section, we get a DroppedSnapshotException and a replay of wal
629      // is required. Currently the only way to do this is a restart of
630      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
631      // where hdfs was bad but passed the hdfs check).
632      server.abort("Replay of WAL required. Forcing server shutdown", ex);
633      return false;
634    } catch (IOException ex) {
635      ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
636      LOG.error(
637        "Cache flush failed"
638            + (region != null ? (" for region " +
639                Bytes.toStringBinary(region.getRegionInfo().getRegionName()))
640              : ""), ex);
641      if (!server.checkFileSystem()) {
642        return false;
643      }
644    } finally {
645      lock.readLock().unlock();
646      wakeUpIfBlocking();
647      tracker.afterExecution();
648    }
649    return true;
650  }
651
652  private void notifyFlushRequest(Region region, boolean emergencyFlush) {
653    FlushType type = null;
654    if (emergencyFlush) {
655      type = isAboveHighWaterMark();
656      if (type == null) {
657        type = isAboveLowWaterMark();
658      }
659    }
660    for (FlushRequestListener listener : flushRequestListeners) {
661      listener.flushRequested(type, region);
662    }
663  }
664
665  private void wakeUpIfBlocking() {
666    synchronized (blockSignal) {
667      blockSignal.notifyAll();
668    }
669  }
670
671  private boolean isTooManyStoreFiles(Region region) {
672
673    // When compaction is disabled, the region is flushable
674    if (!region.getTableDescriptor().isCompactionEnabled()) {
675      return false;
676    }
677
678    for (Store store : region.getStores()) {
679      if (store.hasTooManyStoreFiles()) {
680        return true;
681      }
682    }
683    return false;
684  }
685
686  private int getStoreFileCount(Region region) {
687    int count = 0;
688    for (Store store : region.getStores()) {
689      count += store.getStorefilesCount();
690    }
691    return count;
692  }
693
694  /**
695   * Check if the regionserver's memstore memory usage is greater than the
696   * limit. If so, flush regions with the biggest memstores until we're down
697   * to the lower limit. This method blocks callers until we're down to a safe
698   * amount of memstore consumption.
699   */
700  public void reclaimMemStoreMemory() {
701    try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) {
702      FlushType flushType = isAboveHighWaterMark();
703      if (flushType != FlushType.NORMAL) {
704        TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
705        long start = EnvironmentEdgeManager.currentTime();
706        long nextLogTimeMs = start;
707        synchronized (this.blockSignal) {
708          boolean blocked = false;
709          long startTime = 0;
710          boolean interrupted = false;
711          try {
712            flushType = isAboveHighWaterMark();
713            while (flushType != FlushType.NORMAL && !server.isStopped()) {
714              if (!blocked) {
715                startTime = EnvironmentEdgeManager.currentTime();
716                if (!server.getRegionServerAccounting().isOffheap()) {
717                  logMsg("global memstore heapsize",
718                      server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
719                      server.getRegionServerAccounting().getGlobalMemStoreLimit());
720                } else {
721                  switch (flushType) {
722                    case ABOVE_OFFHEAP_HIGHER_MARK:
723                      logMsg("the global offheap memstore datasize",
724                          server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
725                          server.getRegionServerAccounting().getGlobalMemStoreLimit());
726                      break;
727                    case ABOVE_ONHEAP_HIGHER_MARK:
728                      logMsg("global memstore heapsize",
729                          server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
730                          server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
731                      break;
732                    default:
733                      break;
734                  }
735                }
736              }
737              blocked = true;
738              wakeupFlushThread();
739              try {
740                // we should be able to wait forever, but we've seen a bug where
741                // we miss a notify, so put a 5 second bound on it at least.
742                blockSignal.wait(5 * 1000);
743              } catch (InterruptedException ie) {
744                LOG.warn("Interrupted while waiting");
745                interrupted = true;
746              }
747              long nowMs = EnvironmentEdgeManager.currentTime();
748              if (nowMs >= nextLogTimeMs) {
749                LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
750                nextLogTimeMs = nowMs + 1000;
751              }
752              flushType = isAboveHighWaterMark();
753            }
754          } finally {
755            if (interrupted) {
756              Thread.currentThread().interrupt();
757            }
758          }
759
760          if(blocked){
761            final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
762            if(totalTime > 0){
763              this.updatesBlockedMsHighWater.add(totalTime);
764            }
765            LOG.info("Unblocking updates for server " + server.toString());
766          }
767        }
768      } else {
769        flushType = isAboveLowWaterMark();
770        if (flushType != FlushType.NORMAL) {
771          wakeupFlushThread();
772        }
773      }
774    }
775  }
776
777  private void logMsg(String type, long val, long max) {
778    LOG.info("Blocking updates: {} {} is >= blocking {}", type,
779        TraditionalBinaryPrefix.long2String(val, "", 1),
780        TraditionalBinaryPrefix.long2String(max, "", 1));
781  }
782
783  @Override
784  public String toString() {
785    return "flush_queue="
786        + flushQueue.size();
787  }
788
789  public String dumpQueue() {
790    StringBuilder queueList = new StringBuilder();
791    queueList.append("Flush Queue Queue dump:\n");
792    queueList.append("  Flush Queue:\n");
793    java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
794
795    while(it.hasNext()){
796      queueList.append("    "+it.next().toString());
797      queueList.append("\n");
798    }
799
800    return queueList.toString();
801  }
802
803  /**
804   * Register a MemstoreFlushListener
805   * @param listener
806   */
807  @Override
808  public void registerFlushRequestListener(final FlushRequestListener listener) {
809    this.flushRequestListeners.add(listener);
810  }
811
812  /**
813   * Unregister the listener from MemstoreFlushListeners
814   * @param listener
815   * @return true when passed listener is unregistered successfully.
816   */
817  @Override
818  public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
819    return this.flushRequestListeners.remove(listener);
820  }
821
822  /**
823   * Sets the global memstore limit to a new size.
824   * @param globalMemStoreSize
825   */
826  @Override
827  public void setGlobalMemStoreLimit(long globalMemStoreSize) {
828    this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize);
829    reclaimMemStoreMemory();
830  }
831
832  interface FlushQueueEntry extends Delayed {
833  }
834
835  /**
836   * Datastructure used in the flush queue.  Holds region and retry count.
837   * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
838   * construction, the delay is zero. When added to a delay queue, we'll come
839   * out near immediately.  Call {@link #requeue(long)} passing delay in
840   * milliseconds before readding to delay queue if you want it to stay there
841   * a while.
842   */
843  static class FlushRegionEntry implements FlushQueueEntry {
844    private final HRegion region;
845
846    private final long createTime;
847    private long whenToExpire;
848    private int requeueCount = 0;
849
850    private final List<byte[]> families;
851
852    private final FlushLifeCycleTracker tracker;
853
854    FlushRegionEntry(final HRegion r, List<byte[]> families,
855        FlushLifeCycleTracker tracker) {
856      this.region = r;
857      this.createTime = EnvironmentEdgeManager.currentTime();
858      this.whenToExpire = this.createTime;
859      this.families = families;
860      this.tracker = tracker;
861    }
862
863    /**
864     * @param maximumWait
865     * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
866     */
867    public boolean isMaximumWait(final long maximumWait) {
868      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
869    }
870
871    /**
872     * @return Count of times {@link #requeue(long)} was called; i.e this is
873     * number of times we've been requeued.
874     */
875    public int getRequeueCount() {
876      return this.requeueCount;
877    }
878
879    public FlushLifeCycleTracker getTracker() {
880      return tracker;
881    }
882
883    /**
884     * @param when When to expire, when to come up out of the queue.
885     * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
886     * to whatever you pass.
887     * @return This.
888     */
889    public FlushRegionEntry requeue(final long when) {
890      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
891      this.requeueCount++;
892      return this;
893    }
894
895    @Override
896    public long getDelay(TimeUnit unit) {
897      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
898          TimeUnit.MILLISECONDS);
899    }
900
901    @Override
902    public int compareTo(Delayed other) {
903      // Delay is compared first. If there is a tie, compare region's hash code
904      int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
905        other.getDelay(TimeUnit.MILLISECONDS)).intValue();
906      if (ret != 0) {
907        return ret;
908      }
909      FlushQueueEntry otherEntry = (FlushQueueEntry) other;
910      return hashCode() - otherEntry.hashCode();
911    }
912
913    @Override
914    public String toString() {
915      return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]";
916    }
917
918    @Override
919    public int hashCode() {
920      int hash = (int) getDelay(TimeUnit.MILLISECONDS);
921      return hash ^ region.hashCode();
922    }
923
924   @Override
925    public boolean equals(Object obj) {
926      if (this == obj) {
927        return true;
928      }
929      if (obj == null || getClass() != obj.getClass()) {
930        return false;
931      }
932      FlushRegionEntry other = (FlushRegionEntry) obj;
933      if (!Bytes.equals(this.region.getRegionInfo().getRegionName(),
934          other.region.getRegionInfo().getRegionName())) {
935        return false;
936      }
937      return compareTo(other) == 0;
938    }
939  }
940}