001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.Thread.UncaughtExceptionHandler;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.ConcurrentModificationException;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.SortedMap;
032import java.util.concurrent.BlockingQueue;
033import java.util.concurrent.DelayQueue;
034import java.util.concurrent.Delayed;
035import java.util.concurrent.ThreadFactory;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.concurrent.atomic.LongAdder;
039import java.util.concurrent.locks.ReentrantReadWriteLock;
040
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.DroppedSnapshotException;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.client.RegionReplicaUtil;
045import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
046import org.apache.hadoop.hbase.trace.TraceUtil;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.hadoop.ipc.RemoteException;
052import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
054import org.apache.htrace.core.TraceScope;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059/**
060 * Thread that flushes cache on request
061 *
062 * NOTE: This class extends Thread rather than Chore because the sleep time
063 * can be interrupted when there is something to do, rather than the Chore
064 * sleep time which is invariant.
065 *
066 * @see FlushRequester
067 */
068@InterfaceAudience.Private
069class MemStoreFlusher implements FlushRequester {
070  private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
071
072  private Configuration conf;
073  // These two data members go together.  Any entry in the one must have
074  // a corresponding entry in the other.
075  private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>();
076  private final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>();
077  private AtomicBoolean wakeupPending = new AtomicBoolean();
078
079  private final long threadWakeFrequency;
080  private final HRegionServer server;
081  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
082  private final Object blockSignal = new Object();
083
084  private long blockingWaitTime;
085  private final LongAdder updatesBlockedMsHighWater = new LongAdder();
086
087  private final FlushHandler[] flushHandlers;
088  private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
089
090  /**
091   * Singleton instance inserted into flush queue used for signaling.
092   */
093  private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() {
094    @Override
095    public long getDelay(TimeUnit unit) {
096      return 0;
097    }
098
099    @Override
100    public int compareTo(Delayed o) {
101      return -1;
102    }
103
104    @Override
105    public boolean equals(Object obj) {
106      return obj == this;
107    }
108
109    @Override
110    public int hashCode() {
111      return 42;
112    }
113  };
114
115
116  /**
117   * @param conf
118   * @param server
119   */
120  public MemStoreFlusher(final Configuration conf,
121      final HRegionServer server) {
122    super();
123    this.conf = conf;
124    this.server = server;
125    this.threadWakeFrequency =
126        conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
127    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
128      90000);
129    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
130    if (handlerCount < 1) {
131      LOG.warn("hbase.hstore.flusher.count was configed to {} which is less than 1, corrected to 1",
132          handlerCount);
133      handlerCount = 1;
134    }
135    this.flushHandlers = new FlushHandler[handlerCount];
136    LOG.info("globalMemStoreLimit="
137        + TraditionalBinaryPrefix
138            .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
139        + ", globalMemStoreLimitLowMark="
140        + TraditionalBinaryPrefix.long2String(
141          this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)
142        + ", Offheap="
143        + (this.server.getRegionServerAccounting().isOffheap()));
144  }
145
146  public LongAdder getUpdatesBlockedMsHighWater() {
147    return this.updatesBlockedMsHighWater;
148  }
149
150  /**
151   * The memstore across all regions has exceeded the low water mark. Pick
152   * one region to flush and flush it synchronously (this is called from the
153   * flush thread)
154   * @return true if successful
155   */
156  private boolean flushOneForGlobalPressure(FlushType flushType) {
157    SortedMap<Long, Collection<HRegion>> regionsBySize = null;
158    switch(flushType) {
159      case ABOVE_OFFHEAP_HIGHER_MARK:
160      case ABOVE_OFFHEAP_LOWER_MARK:
161        regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize();
162        break;
163      case ABOVE_ONHEAP_HIGHER_MARK:
164      case ABOVE_ONHEAP_LOWER_MARK:
165      default:
166        regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize();
167    }
168    Set<HRegion> excludedRegions = new HashSet<>();
169
170    double secondaryMultiplier
171      = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
172
173    boolean flushedOne = false;
174    while (!flushedOne) {
175      // Find the biggest region that doesn't have too many storefiles (might be null!)
176      HRegion bestFlushableRegion =
177          getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
178      // Find the biggest region, total, even if it might have too many flushes.
179      HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false);
180      // Find the biggest region that is a secondary region
181      HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions);
182      if (bestAnyRegion == null) {
183        // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null
184        bestAnyRegion = bestRegionReplica;
185      }
186      if (bestAnyRegion == null) {
187        LOG.error("Above memory mark but there are no flushable regions!");
188        return false;
189      }
190
191      HRegion regionToFlush;
192      long bestAnyRegionSize;
193      long bestFlushableRegionSize;
194      switch(flushType) {
195        case ABOVE_OFFHEAP_HIGHER_MARK:
196        case ABOVE_OFFHEAP_LOWER_MARK:
197          bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
198          bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion);
199          break;
200
201        case ABOVE_ONHEAP_HIGHER_MARK:
202        case ABOVE_ONHEAP_LOWER_MARK:
203          bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
204          bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion);
205          break;
206
207        default:
208          bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
209          bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion);
210      }
211      if (bestAnyRegionSize > 2 * bestFlushableRegionSize) {
212        // Even if it's not supposed to be flushed, pick a region if it's more than twice
213        // as big as the best flushable one - otherwise when we're under pressure we make
214        // lots of little flushes and cause lots of compactions, etc, which just makes
215        // life worse!
216        if (LOG.isDebugEnabled()) {
217          LOG.debug("Under global heap pressure: " + "Region "
218              + bestAnyRegion.getRegionInfo().getRegionNameAsString()
219              + " has too many " + "store files, but is "
220              + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1)
221              + " vs best flushable region's "
222              + TraditionalBinaryPrefix.long2String(
223              bestFlushableRegionSize, "", 1)
224              + ". Choosing the bigger.");
225        }
226        regionToFlush = bestAnyRegion;
227      } else {
228        if (bestFlushableRegion == null) {
229          regionToFlush = bestAnyRegion;
230        } else {
231          regionToFlush = bestFlushableRegion;
232        }
233      }
234
235      long regionToFlushSize;
236      long bestRegionReplicaSize;
237      switch(flushType) {
238        case ABOVE_OFFHEAP_HIGHER_MARK:
239        case ABOVE_OFFHEAP_LOWER_MARK:
240          regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
241          bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica);
242          break;
243
244        case ABOVE_ONHEAP_HIGHER_MARK:
245        case ABOVE_ONHEAP_LOWER_MARK:
246          regionToFlushSize = regionToFlush.getMemStoreHeapSize();
247          bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica);
248          break;
249
250        default:
251          regionToFlushSize = regionToFlush.getMemStoreDataSize();
252          bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica);
253      }
254
255      if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) {
256        // A concurrency issue (such as splitting region) may happen such that the online region
257        // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to
258        // getBiggestMemStoreRegion(). This means that we can come out of the loop
259        LOG.debug("Above memory mark but there is no flushable region");
260        return false;
261      }
262
263      if (regionToFlush == null ||
264          (bestRegionReplica != null &&
265           ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
266           (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))) {
267        LOG.info("Refreshing storefiles of region " + bestRegionReplica +
268            " due to global heap pressure. Total memstore off heap size=" +
269            TraditionalBinaryPrefix.long2String(
270              server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
271            " memstore heap size=" + TraditionalBinaryPrefix.long2String(
272              server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
273        flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
274        if (!flushedOne) {
275          LOG.info("Excluding secondary region " + bestRegionReplica +
276              " - trying to find a different region to refresh files.");
277          excludedRegions.add(bestRegionReplica);
278        }
279      } else {
280        LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " +
281            "Flush type=" + flushType.toString() +
282            ", Total Memstore Heap size=" +
283            TraditionalBinaryPrefix.long2String(
284                server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) +
285            ", Total Memstore Off-Heap size=" +
286            TraditionalBinaryPrefix.long2String(
287                server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
288            ", Region memstore size=" +
289            TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
290        flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY);
291
292        if (!flushedOne) {
293          LOG.info("Excluding unflushable region " + regionToFlush +
294              " - trying to find a different region to flush.");
295          excludedRegions.add(regionToFlush);
296        }
297      }
298    }
299    return true;
300  }
301
302  /**
303   * @return Return memstore offheap size or null if <code>r</code> is null
304   */
305  private static long getMemStoreOffHeapSize(HRegion r) {
306    return r == null? 0: r.getMemStoreOffHeapSize();
307  }
308
309  /**
310   * @return Return memstore heap size or null if <code>r</code> is null
311   */
312  private static long getMemStoreHeapSize(HRegion r) {
313    return r == null? 0: r.getMemStoreHeapSize();
314  }
315
316  /**
317   * @return Return memstore data size or null if <code>r</code> is null
318   */
319  private static long getMemStoreDataSize(HRegion r) {
320    return r == null? 0: r.getMemStoreDataSize();
321  }
322
323  private class FlushHandler extends Thread {
324
325    private FlushHandler(String name) {
326      super(name);
327    }
328
329    @Override
330    public void run() {
331      while (!server.isStopped()) {
332        FlushQueueEntry fqe = null;
333        try {
334          wakeupPending.set(false); // allow someone to wake us up again
335          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
336          if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
337            FlushType type = isAboveLowWaterMark();
338            if (type != FlushType.NORMAL) {
339              LOG.debug("Flush thread woke up because memory above low water="
340                  + TraditionalBinaryPrefix.long2String(
341                    server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1));
342              // For offheap memstore, even if the lower water mark was breached due to heap overhead
343              // we still select the regions based on the region's memstore data size.
344              // TODO : If we want to decide based on heap over head it can be done without tracking
345              // it per region.
346              if (!flushOneForGlobalPressure(type)) {
347                // Wasn't able to flush any region, but we're above low water mark
348                // This is unlikely to happen, but might happen when closing the
349                // entire server - another thread is flushing regions. We'll just
350                // sleep a little bit to avoid spinning, and then pretend that
351                // we flushed one, so anyone blocked will check again
352                Thread.sleep(1000);
353                wakeUpIfBlocking();
354              }
355              // Enqueue another one of these tokens so we'll wake up again
356              wakeupFlushThread();
357            }
358            continue;
359          }
360          FlushRegionEntry fre = (FlushRegionEntry) fqe;
361          if (!flushRegion(fre)) {
362            break;
363          }
364        } catch (InterruptedException ex) {
365          continue;
366        } catch (ConcurrentModificationException ex) {
367          continue;
368        } catch (Exception ex) {
369          LOG.error("Cache flusher failed for entry " + fqe, ex);
370          if (!server.checkFileSystem()) {
371            break;
372          }
373        }
374      }
375      synchronized (regionsInQueue) {
376        regionsInQueue.clear();
377        flushQueue.clear();
378      }
379
380      // Signal anyone waiting, so they see the close flag
381      wakeUpIfBlocking();
382      LOG.info(getName() + " exiting");
383    }
384  }
385
386
387  private void wakeupFlushThread() {
388    if (wakeupPending.compareAndSet(false, true)) {
389      flushQueue.add(WAKEUPFLUSH_INSTANCE);
390    }
391  }
392
393  private HRegion getBiggestMemStoreRegion(
394      SortedMap<Long, Collection<HRegion>> regionsBySize,
395      Set<HRegion> excludedRegions,
396      boolean checkStoreFileCount) {
397    synchronized (regionsInQueue) {
398      for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) {
399        for (HRegion region : entry.getValue()) {
400          if (excludedRegions.contains(region)) {
401            continue;
402          }
403
404          if (region.writestate.flushing || !region.writestate.writesEnabled) {
405            continue;
406          }
407
408          if (checkStoreFileCount && isTooManyStoreFiles(region)) {
409            continue;
410          }
411          return region;
412        }
413      }
414    }
415    return null;
416  }
417
418  private HRegion getBiggestMemStoreOfRegionReplica(
419      SortedMap<Long, Collection<HRegion>> regionsBySize,
420      Set<HRegion> excludedRegions) {
421    synchronized (regionsInQueue) {
422      for (Map.Entry<Long, Collection<HRegion>> entry : regionsBySize.entrySet()) {
423        for (HRegion region : entry.getValue()) {
424          if (excludedRegions.contains(region)) {
425            continue;
426          }
427
428          if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
429            continue;
430          }
431          return region;
432        }
433      }
434    }
435    return null;
436  }
437
438  private boolean refreshStoreFilesAndReclaimMemory(Region region) {
439    try {
440      return region.refreshStoreFiles();
441    } catch (IOException e) {
442      LOG.warn("Refreshing store files failed with exception", e);
443    }
444    return false;
445  }
446
447  /**
448   * Return true if global memory usage is above the high watermark
449   */
450  private FlushType isAboveHighWaterMark() {
451    return server.getRegionServerAccounting().isAboveHighWaterMark();
452  }
453
454  /**
455   * Return true if we're above the low watermark
456   */
457  private FlushType isAboveLowWaterMark() {
458    return server.getRegionServerAccounting().isAboveLowWaterMark();
459  }
460
461  @Override
462  public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) {
463    return this.requestFlush(r, null, tracker);
464  }
465
466  @Override
467  public boolean requestFlush(HRegion r, List<byte[]> families,
468      FlushLifeCycleTracker tracker) {
469    synchronized (regionsInQueue) {
470      if (!regionsInQueue.containsKey(r)) {
471        // This entry has no delay so it will be added at the top of the flush
472        // queue. It'll come out near immediately.
473        FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker);
474        this.regionsInQueue.put(r, fqe);
475        this.flushQueue.add(fqe);
476        r.incrementFlushesQueuedCount();
477        return true;
478      } else {
479        tracker.notExecuted("Flush already requested on " + r);
480        return false;
481      }
482    }
483  }
484
485  @Override
486  public boolean requestDelayedFlush(HRegion r, long delay) {
487    synchronized (regionsInQueue) {
488      if (!regionsInQueue.containsKey(r)) {
489        // This entry has some delay
490        FlushRegionEntry fqe =
491            new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
492        fqe.requeue(delay);
493        this.regionsInQueue.put(r, fqe);
494        this.flushQueue.add(fqe);
495        r.incrementFlushesQueuedCount();
496        return true;
497      }
498      return false;
499    }
500  }
501
502  public int getFlushQueueSize() {
503    return flushQueue.size();
504  }
505
506  /**
507   * Only interrupt once it's done with a run through the work loop.
508   */
509  void interruptIfNecessary() {
510    lock.writeLock().lock();
511    try {
512      for (FlushHandler flushHander : flushHandlers) {
513        if (flushHander != null) flushHander.interrupt();
514      }
515    } finally {
516      lock.writeLock().unlock();
517    }
518  }
519
520  synchronized void start(UncaughtExceptionHandler eh) {
521    ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder()
522      .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d")
523      .setDaemon(true).setUncaughtExceptionHandler(eh).build();
524    for (int i = 0; i < flushHandlers.length; i++) {
525      flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
526      flusherThreadFactory.newThread(flushHandlers[i]);
527      flushHandlers[i].start();
528    }
529  }
530
531  boolean isAlive() {
532    for (FlushHandler flushHander : flushHandlers) {
533      if (flushHander != null && flushHander.isAlive()) {
534        return true;
535      }
536    }
537    return false;
538  }
539
540  void join() {
541    for (FlushHandler flushHander : flushHandlers) {
542      if (flushHander != null) {
543        Threads.shutdown(flushHander);
544      }
545    }
546  }
547
548  /**
549   * A flushRegion that checks store file count.  If too many, puts the flush
550   * on delay queue to retry later.
551   * @param fqe
552   * @return true if the region was successfully flushed, false otherwise. If
553   * false, there will be accompanying log messages explaining why the region was
554   * not flushed.
555   */
556  private boolean flushRegion(final FlushRegionEntry fqe) {
557    HRegion region = fqe.region;
558    if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) {
559      if (fqe.isMaximumWait(this.blockingWaitTime)) {
560        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
561          "ms on a compaction to clean up 'too many store files'; waited " +
562          "long enough... proceeding with flush of " +
563          region.getRegionInfo().getRegionNameAsString());
564      } else {
565        // If this is first time we've been put off, then emit a log message.
566        if (fqe.getRequeueCount() <= 0) {
567          // Note: We don't impose blockingStoreFiles constraint on meta regions
568          LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
569              region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
570              this.blockingWaitTime);
571          if (!this.server.compactSplitThread.requestSplit(region)) {
572            try {
573              this.server.compactSplitThread.requestSystemCompaction(region,
574                Thread.currentThread().getName());
575            } catch (IOException e) {
576              e = e instanceof RemoteException ?
577                      ((RemoteException)e).unwrapRemoteException() : e;
578              LOG.error("Cache flush failed for region " +
579                Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e);
580            }
581          }
582        }
583
584        // Put back on the queue.  Have it come back out of the queue
585        // after a delay of this.blockingWaitTime / 100 ms.
586        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
587        // Tell a lie, it's not flushed but it's ok
588        return true;
589      }
590    }
591    return flushRegion(region, false, fqe.families, fqe.getTracker());
592  }
593
594  /**
595   * Flush a region.
596   * @param region Region to flush.
597   * @param emergencyFlush Set if we are being force flushed. If true the region
598   * needs to be removed from the flush queue. If false, when we were called
599   * from the main flusher run loop and we got the entry to flush by calling
600   * poll on the flush queue (which removed it).
601   * @param families stores of region to flush.
602   * @return true if the region was successfully flushed, false otherwise. If
603   * false, there will be accompanying log messages explaining why the region was
604   * not flushed.
605   */
606  private boolean flushRegion(HRegion region, boolean emergencyFlush,
607      List<byte[]> families, FlushLifeCycleTracker tracker) {
608    synchronized (this.regionsInQueue) {
609      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
610      // Use the start time of the FlushRegionEntry if available
611      if (fqe != null && emergencyFlush) {
612        // Need to remove from region from delay queue. When NOT an
613        // emergencyFlush, then item was removed via a flushQueue.poll.
614        flushQueue.remove(fqe);
615      }
616    }
617
618    tracker.beforeExecution();
619    lock.readLock().lock();
620    try {
621      notifyFlushRequest(region, emergencyFlush);
622      FlushResult flushResult = region.flushcache(families, false, tracker);
623      boolean shouldCompact = flushResult.isCompactionNeeded();
624      // We just want to check the size
625      boolean shouldSplit = region.checkSplit().isPresent();
626      if (shouldSplit) {
627        this.server.compactSplitThread.requestSplit(region);
628      } else if (shouldCompact) {
629        server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
630      }
631    } catch (DroppedSnapshotException ex) {
632      // Cache flush can fail in a few places. If it fails in a critical
633      // section, we get a DroppedSnapshotException and a replay of wal
634      // is required. Currently the only way to do this is a restart of
635      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
636      // where hdfs was bad but passed the hdfs check).
637      server.abort("Replay of WAL required. Forcing server shutdown", ex);
638      return false;
639    } catch (IOException ex) {
640      ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
641      LOG.error(
642        "Cache flush failed"
643            + (region != null ? (" for region " +
644                Bytes.toStringBinary(region.getRegionInfo().getRegionName()))
645              : ""), ex);
646      if (!server.checkFileSystem()) {
647        return false;
648      }
649    } finally {
650      lock.readLock().unlock();
651      wakeUpIfBlocking();
652      tracker.afterExecution();
653    }
654    return true;
655  }
656
657  private void notifyFlushRequest(Region region, boolean emergencyFlush) {
658    FlushType type = null;
659    if (emergencyFlush) {
660      type = isAboveHighWaterMark();
661      if (type == null) {
662        type = isAboveLowWaterMark();
663      }
664    }
665    for (FlushRequestListener listener : flushRequestListeners) {
666      listener.flushRequested(type, region);
667    }
668  }
669
670  private void wakeUpIfBlocking() {
671    synchronized (blockSignal) {
672      blockSignal.notifyAll();
673    }
674  }
675
676  private boolean isTooManyStoreFiles(Region region) {
677
678    // When compaction is disabled, the region is flushable
679    if (!region.getTableDescriptor().isCompactionEnabled()) {
680      return false;
681    }
682
683    for (Store store : region.getStores()) {
684      if (store.hasTooManyStoreFiles()) {
685        return true;
686      }
687    }
688    return false;
689  }
690
691  private int getStoreFileCount(Region region) {
692    int count = 0;
693    for (Store store : region.getStores()) {
694      count += store.getStorefilesCount();
695    }
696    return count;
697  }
698
699  /**
700   * Check if the regionserver's memstore memory usage is greater than the
701   * limit. If so, flush regions with the biggest memstores until we're down
702   * to the lower limit. This method blocks callers until we're down to a safe
703   * amount of memstore consumption.
704   */
705  public void reclaimMemStoreMemory() {
706    try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) {
707      FlushType flushType = isAboveHighWaterMark();
708      if (flushType != FlushType.NORMAL) {
709        TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
710        long start = EnvironmentEdgeManager.currentTime();
711        long nextLogTimeMs = start;
712        synchronized (this.blockSignal) {
713          boolean blocked = false;
714          long startTime = 0;
715          boolean interrupted = false;
716          try {
717            flushType = isAboveHighWaterMark();
718            while (flushType != FlushType.NORMAL && !server.isStopped()) {
719              if (!blocked) {
720                startTime = EnvironmentEdgeManager.currentTime();
721                if (!server.getRegionServerAccounting().isOffheap()) {
722                  logMsg("global memstore heapsize",
723                      server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
724                      server.getRegionServerAccounting().getGlobalMemStoreLimit());
725                } else {
726                  switch (flushType) {
727                    case ABOVE_OFFHEAP_HIGHER_MARK:
728                      logMsg("the global offheap memstore datasize",
729                          server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
730                          server.getRegionServerAccounting().getGlobalMemStoreLimit());
731                      break;
732                    case ABOVE_ONHEAP_HIGHER_MARK:
733                      logMsg("global memstore heapsize",
734                          server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
735                          server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
736                      break;
737                    default:
738                      break;
739                  }
740                }
741              }
742              blocked = true;
743              wakeupFlushThread();
744              try {
745                // we should be able to wait forever, but we've seen a bug where
746                // we miss a notify, so put a 5 second bound on it at least.
747                blockSignal.wait(5 * 1000);
748              } catch (InterruptedException ie) {
749                LOG.warn("Interrupted while waiting");
750                interrupted = true;
751              }
752              long nowMs = EnvironmentEdgeManager.currentTime();
753              if (nowMs >= nextLogTimeMs) {
754                LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
755                nextLogTimeMs = nowMs + 1000;
756              }
757              flushType = isAboveHighWaterMark();
758            }
759          } finally {
760            if (interrupted) {
761              Thread.currentThread().interrupt();
762            }
763          }
764
765          if(blocked){
766            final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
767            if(totalTime > 0){
768              this.updatesBlockedMsHighWater.add(totalTime);
769            }
770            LOG.info("Unblocking updates for server " + server.toString());
771          }
772        }
773      } else {
774        flushType = isAboveLowWaterMark();
775        if (flushType != FlushType.NORMAL) {
776          wakeupFlushThread();
777        }
778      }
779    }
780  }
781
782  private void logMsg(String type, long val, long max) {
783    LOG.info("Blocking updates: {} {} is >= blocking {}", type,
784        TraditionalBinaryPrefix.long2String(val, "", 1),
785        TraditionalBinaryPrefix.long2String(max, "", 1));
786  }
787
788  @Override
789  public String toString() {
790    return "flush_queue="
791        + flushQueue.size();
792  }
793
794  public String dumpQueue() {
795    StringBuilder queueList = new StringBuilder();
796    queueList.append("Flush Queue Queue dump:\n");
797    queueList.append("  Flush Queue:\n");
798    java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
799
800    while(it.hasNext()){
801      queueList.append("    "+it.next().toString());
802      queueList.append("\n");
803    }
804
805    return queueList.toString();
806  }
807
808  /**
809   * Register a MemstoreFlushListener
810   * @param listener
811   */
812  @Override
813  public void registerFlushRequestListener(final FlushRequestListener listener) {
814    this.flushRequestListeners.add(listener);
815  }
816
817  /**
818   * Unregister the listener from MemstoreFlushListeners
819   * @param listener
820   * @return true when passed listener is unregistered successfully.
821   */
822  @Override
823  public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
824    return this.flushRequestListeners.remove(listener);
825  }
826
827  /**
828   * Sets the global memstore limit to a new size.
829   * @param globalMemStoreSize
830   */
831  @Override
832  public void setGlobalMemStoreLimit(long globalMemStoreSize) {
833    this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize);
834    reclaimMemStoreMemory();
835  }
836
837  interface FlushQueueEntry extends Delayed {
838  }
839
840  /**
841   * Datastructure used in the flush queue.  Holds region and retry count.
842   * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
843   * construction, the delay is zero. When added to a delay queue, we'll come
844   * out near immediately.  Call {@link #requeue(long)} passing delay in
845   * milliseconds before readding to delay queue if you want it to stay there
846   * a while.
847   */
848  static class FlushRegionEntry implements FlushQueueEntry {
849    private final HRegion region;
850
851    private final long createTime;
852    private long whenToExpire;
853    private int requeueCount = 0;
854
855    private final List<byte[]> families;
856
857    private final FlushLifeCycleTracker tracker;
858
859    FlushRegionEntry(final HRegion r, List<byte[]> families,
860        FlushLifeCycleTracker tracker) {
861      this.region = r;
862      this.createTime = EnvironmentEdgeManager.currentTime();
863      this.whenToExpire = this.createTime;
864      this.families = families;
865      this.tracker = tracker;
866    }
867
868    /**
869     * @param maximumWait
870     * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
871     */
872    public boolean isMaximumWait(final long maximumWait) {
873      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
874    }
875
876    /**
877     * @return Count of times {@link #requeue(long)} was called; i.e this is
878     * number of times we've been requeued.
879     */
880    public int getRequeueCount() {
881      return this.requeueCount;
882    }
883
884    public FlushLifeCycleTracker getTracker() {
885      return tracker;
886    }
887
888    /**
889     * @param when When to expire, when to come up out of the queue.
890     * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
891     * to whatever you pass.
892     * @return This.
893     */
894    public FlushRegionEntry requeue(final long when) {
895      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
896      this.requeueCount++;
897      return this;
898    }
899
900    @Override
901    public long getDelay(TimeUnit unit) {
902      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
903          TimeUnit.MILLISECONDS);
904    }
905
906    @Override
907    public int compareTo(Delayed other) {
908      // Delay is compared first. If there is a tie, compare region's hash code
909      int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
910        other.getDelay(TimeUnit.MILLISECONDS)).intValue();
911      if (ret != 0) {
912        return ret;
913      }
914      FlushQueueEntry otherEntry = (FlushQueueEntry) other;
915      return hashCode() - otherEntry.hashCode();
916    }
917
918    @Override
919    public String toString() {
920      return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]";
921    }
922
923    @Override
924    public int hashCode() {
925      int hash = (int) getDelay(TimeUnit.MILLISECONDS);
926      return hash ^ region.hashCode();
927    }
928
929   @Override
930    public boolean equals(Object obj) {
931      if (this == obj) {
932        return true;
933      }
934      if (obj == null || getClass() != obj.getClass()) {
935        return false;
936      }
937      FlushRegionEntry other = (FlushRegionEntry) obj;
938      if (!Bytes.equals(this.region.getRegionInfo().getRegionName(),
939          other.region.getRegionInfo().getRegionName())) {
940        return false;
941      }
942      return compareTo(other) == 0;
943    }
944  }
945}