001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.Thread.UncaughtExceptionHandler;
023import java.util.ArrayList;
024import java.util.ConcurrentModificationException;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.SortedMap;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.DelayQueue;
033import java.util.concurrent.Delayed;
034import java.util.concurrent.ThreadFactory;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.LongAdder;
038import java.util.concurrent.locks.ReentrantReadWriteLock;
039
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.DroppedSnapshotException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.client.RegionReplicaUtil;
044import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
045import org.apache.hadoop.hbase.trace.TraceUtil;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.HasThread;
049import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.hadoop.ipc.RemoteException;
052import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
053import org.apache.htrace.core.TraceScope;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * Thread that flushes cache on request
060 *
061 * NOTE: This class extends Thread rather than Chore because the sleep time
062 * can be interrupted when there is something to do, rather than the Chore
063 * sleep time which is invariant.
064 *
065 * @see FlushRequester
066 */
067@InterfaceAudience.Private
068class MemStoreFlusher implements FlushRequester {
069  private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
070
071  private Configuration conf;
072  // These two data members go together.  Any entry in the one must have
073  // a corresponding entry in the other.
074  private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>();
075  private final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>();
076  private AtomicBoolean wakeupPending = new AtomicBoolean();
077
078  private final long threadWakeFrequency;
079  private final HRegionServer server;
080  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
081  private final Object blockSignal = new Object();
082
083  private long blockingWaitTime;
084  private final LongAdder updatesBlockedMsHighWater = new LongAdder();
085
086  private final FlushHandler[] flushHandlers;
087  private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
088
089  private FlushType flushType;
090
091  /**
092   * Singleton instance inserted into flush queue used for signaling.
093   */
094  private static final FlushQueueEntry WAKEUPFLUSH_INSTANCE = new FlushQueueEntry() {
095    @Override
096    public long getDelay(TimeUnit unit) {
097      return 0;
098    }
099
100    @Override
101    public int compareTo(Delayed o) {
102      return -1;
103    }
104
105    @Override
106    public boolean equals(Object obj) {
107      return obj == this;
108    }
109
110    @Override
111    public int hashCode() {
112      return 42;
113    }
114  };
115
116
117  /**
118   * @param conf
119   * @param server
120   */
121  public MemStoreFlusher(final Configuration conf,
122      final HRegionServer server) {
123    super();
124    this.conf = conf;
125    this.server = server;
126    this.threadWakeFrequency =
127        conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
128    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
129      90000);
130    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
131    this.flushHandlers = new FlushHandler[handlerCount];
132    LOG.info("globalMemStoreLimit="
133        + TraditionalBinaryPrefix
134            .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
135        + ", globalMemStoreLimitLowMark="
136        + TraditionalBinaryPrefix.long2String(
137          this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)
138        + ", Offheap="
139        + (this.server.getRegionServerAccounting().isOffheap()));
140  }
141
142  public LongAdder getUpdatesBlockedMsHighWater() {
143    return this.updatesBlockedMsHighWater;
144  }
145
146  public void setFlushType(FlushType flushType) {
147    this.flushType = flushType;
148  }
149
150  /**
151   * The memstore across all regions has exceeded the low water mark. Pick
152   * one region to flush and flush it synchronously (this is called from the
153   * flush thread)
154   * @return true if successful
155   */
156  private boolean flushOneForGlobalPressure() {
157    SortedMap<Long, HRegion> regionsBySize = null;
158    switch(flushType) {
159      case ABOVE_OFFHEAP_HIGHER_MARK:
160      case ABOVE_OFFHEAP_LOWER_MARK:
161        regionsBySize = server.getCopyOfOnlineRegionsSortedByOffHeapSize();
162        break;
163      case ABOVE_ONHEAP_HIGHER_MARK:
164      case ABOVE_ONHEAP_LOWER_MARK:
165      default:
166        regionsBySize = server.getCopyOfOnlineRegionsSortedByOnHeapSize();
167    }
168    Set<HRegion> excludedRegions = new HashSet<>();
169
170    double secondaryMultiplier
171      = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
172
173    boolean flushedOne = false;
174    while (!flushedOne) {
175      // Find the biggest region that doesn't have too many storefiles (might be null!)
176      HRegion bestFlushableRegion =
177          getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
178      // Find the biggest region, total, even if it might have too many flushes.
179      HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false);
180      // Find the biggest region that is a secondary region
181      HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions);
182      if (bestAnyRegion == null) {
183        // If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null
184        bestAnyRegion = bestRegionReplica;
185      }
186      if (bestAnyRegion == null) {
187        LOG.error("Above memory mark but there are no flushable regions!");
188        return false;
189      }
190
191      HRegion regionToFlush;
192      long bestAnyRegionSize;
193      long bestFlushableRegionSize;
194      switch(flushType) {
195        case ABOVE_OFFHEAP_HIGHER_MARK:
196        case ABOVE_OFFHEAP_LOWER_MARK:
197          bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
198          bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion);
199          break;
200
201        case ABOVE_ONHEAP_HIGHER_MARK:
202        case ABOVE_ONHEAP_LOWER_MARK:
203          bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
204          bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion);
205          break;
206
207        default:
208          bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
209          bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion);
210      }
211      if (bestAnyRegionSize > 2 * bestFlushableRegionSize) {
212        // Even if it's not supposed to be flushed, pick a region if it's more than twice
213        // as big as the best flushable one - otherwise when we're under pressure we make
214        // lots of little flushes and cause lots of compactions, etc, which just makes
215        // life worse!
216        if (LOG.isDebugEnabled()) {
217          LOG.debug("Under global heap pressure: " + "Region "
218              + bestAnyRegion.getRegionInfo().getRegionNameAsString()
219              + " has too many " + "store files, but is "
220              + TraditionalBinaryPrefix.long2String(bestAnyRegionSize, "", 1)
221              + " vs best flushable region's "
222              + TraditionalBinaryPrefix.long2String(
223              bestFlushableRegionSize, "", 1)
224              + ". Choosing the bigger.");
225        }
226        regionToFlush = bestAnyRegion;
227      } else {
228        if (bestFlushableRegion == null) {
229          regionToFlush = bestAnyRegion;
230        } else {
231          regionToFlush = bestFlushableRegion;
232        }
233      }
234
235      long regionToFlushSize;
236      long bestRegionReplicaSize;
237      switch(flushType) {
238        case ABOVE_OFFHEAP_HIGHER_MARK:
239        case ABOVE_OFFHEAP_LOWER_MARK:
240          regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
241          bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica);
242          break;
243
244        case ABOVE_ONHEAP_HIGHER_MARK:
245        case ABOVE_ONHEAP_LOWER_MARK:
246          regionToFlushSize = regionToFlush.getMemStoreHeapSize();
247          bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica);
248          break;
249
250        default:
251          regionToFlushSize = regionToFlush.getMemStoreDataSize();
252          bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica);
253      }
254
255      if ((regionToFlush == null || regionToFlushSize == 0) && bestRegionReplicaSize == 0) {
256        // A concurrency issue (such as splitting region) may happen such that the online region
257        // seen by getCopyOfOnlineRegionsSortedByXX() method is no longer eligible to
258        // getBiggestMemStoreRegion(). This means that we can come out of the loop
259        LOG.debug("Above memory mark but there is no flushable region");
260        return false;
261      }
262
263      if (regionToFlush == null ||
264          (bestRegionReplica != null &&
265           ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
266           (bestRegionReplicaSize > secondaryMultiplier * regionToFlushSize))) {
267        LOG.info("Refreshing storefiles of region " + bestRegionReplica +
268            " due to global heap pressure. Total memstore off heap size=" +
269            TraditionalBinaryPrefix.long2String(
270              server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
271            " memstore heap size=" + TraditionalBinaryPrefix.long2String(
272              server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
273        flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
274        if (!flushedOne) {
275          LOG.info("Excluding secondary region " + bestRegionReplica +
276              " - trying to find a different region to refresh files.");
277          excludedRegions.add(bestRegionReplica);
278        }
279      } else {
280        LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " +
281            "Flush type=" + flushType.toString() +
282            ", Total Memstore Heap size=" +
283            TraditionalBinaryPrefix.long2String(
284                server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1) +
285            ", Total Memstore Off-Heap size=" +
286            TraditionalBinaryPrefix.long2String(
287                server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
288            ", Region memstore size=" +
289            TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
290        flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY);
291
292        if (!flushedOne) {
293          LOG.info("Excluding unflushable region " + regionToFlush +
294              " - trying to find a different region to flush.");
295          excludedRegions.add(regionToFlush);
296        }
297      }
298    }
299    return true;
300  }
301
302  /**
303   * @return Return memstore offheap size or null if <code>r</code> is null
304   */
305  private static long getMemStoreOffHeapSize(HRegion r) {
306    return r == null? 0: r.getMemStoreOffHeapSize();
307  }
308
309  /**
310   * @return Return memstore heap size or null if <code>r</code> is null
311   */
312  private static long getMemStoreHeapSize(HRegion r) {
313    return r == null? 0: r.getMemStoreHeapSize();
314  }
315
316  /**
317   * @return Return memstore data size or null if <code>r</code> is null
318   */
319  private static long getMemStoreDataSize(HRegion r) {
320    return r == null? 0: r.getMemStoreDataSize();
321  }
322
323  private class FlushHandler extends HasThread {
324
325    private FlushHandler(String name) {
326      super(name);
327    }
328
329    @Override
330    public void run() {
331      while (!server.isStopped()) {
332        FlushQueueEntry fqe = null;
333        try {
334          wakeupPending.set(false); // allow someone to wake us up again
335          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
336          if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
337            FlushType type = isAboveLowWaterMark();
338            if (type != FlushType.NORMAL) {
339              LOG.debug("Flush thread woke up because memory above low water="
340                  + TraditionalBinaryPrefix.long2String(
341                    server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1));
342              // For offheap memstore, even if the lower water mark was breached due to heap overhead
343              // we still select the regions based on the region's memstore data size.
344              // TODO : If we want to decide based on heap over head it can be done without tracking
345              // it per region.
346              if (!flushOneForGlobalPressure()) {
347                // Wasn't able to flush any region, but we're above low water mark
348                // This is unlikely to happen, but might happen when closing the
349                // entire server - another thread is flushing regions. We'll just
350                // sleep a little bit to avoid spinning, and then pretend that
351                // we flushed one, so anyone blocked will check again
352                Thread.sleep(1000);
353                wakeUpIfBlocking();
354              }
355              // Enqueue another one of these tokens so we'll wake up again
356              wakeupFlushThread();
357            }
358            continue;
359          }
360          FlushRegionEntry fre = (FlushRegionEntry) fqe;
361          if (!flushRegion(fre)) {
362            break;
363          }
364        } catch (InterruptedException ex) {
365          continue;
366        } catch (ConcurrentModificationException ex) {
367          continue;
368        } catch (Exception ex) {
369          LOG.error("Cache flusher failed for entry " + fqe, ex);
370          if (!server.checkFileSystem()) {
371            break;
372          }
373        }
374      }
375      synchronized (regionsInQueue) {
376        regionsInQueue.clear();
377        flushQueue.clear();
378      }
379
380      // Signal anyone waiting, so they see the close flag
381      wakeUpIfBlocking();
382      LOG.info(getName() + " exiting");
383    }
384  }
385
386
387  private void wakeupFlushThread() {
388    if (wakeupPending.compareAndSet(false, true)) {
389      flushQueue.add(WAKEUPFLUSH_INSTANCE);
390    }
391  }
392
393  private HRegion getBiggestMemStoreRegion(
394      SortedMap<Long, HRegion> regionsBySize,
395      Set<HRegion> excludedRegions,
396      boolean checkStoreFileCount) {
397    synchronized (regionsInQueue) {
398      for (HRegion region : regionsBySize.values()) {
399        if (excludedRegions.contains(region)) {
400          continue;
401        }
402
403        if (region.writestate.flushing || !region.writestate.writesEnabled) {
404          continue;
405        }
406
407        if (checkStoreFileCount && isTooManyStoreFiles(region)) {
408          continue;
409        }
410        return region;
411      }
412    }
413    return null;
414  }
415
416  private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize,
417      Set<HRegion> excludedRegions) {
418    synchronized (regionsInQueue) {
419      for (HRegion region : regionsBySize.values()) {
420        if (excludedRegions.contains(region)) {
421          continue;
422        }
423
424        if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
425          continue;
426        }
427
428        return region;
429      }
430    }
431    return null;
432  }
433
434  private boolean refreshStoreFilesAndReclaimMemory(Region region) {
435    try {
436      return region.refreshStoreFiles();
437    } catch (IOException e) {
438      LOG.warn("Refreshing store files failed with exception", e);
439    }
440    return false;
441  }
442
443  /**
444   * Return true if global memory usage is above the high watermark
445   */
446  private FlushType isAboveHighWaterMark() {
447    return server.getRegionServerAccounting().isAboveHighWaterMark();
448  }
449
450  /**
451   * Return true if we're above the low watermark
452   */
453  private FlushType isAboveLowWaterMark() {
454    return server.getRegionServerAccounting().isAboveLowWaterMark();
455  }
456
457  @Override
458  public boolean requestFlush(HRegion r, boolean forceFlushAllStores,
459                              FlushLifeCycleTracker tracker) {
460    synchronized (regionsInQueue) {
461      if (!regionsInQueue.containsKey(r)) {
462        // This entry has no delay so it will be added at the top of the flush
463        // queue. It'll come out near immediately.
464        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
465        this.regionsInQueue.put(r, fqe);
466        this.flushQueue.add(fqe);
467        r.incrementFlushesQueuedCount();
468        return true;
469      } else {
470        tracker.notExecuted("Flush already requested on " + r);
471        return false;
472      }
473    }
474  }
475
476  @Override
477  public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
478    synchronized (regionsInQueue) {
479      if (!regionsInQueue.containsKey(r)) {
480        // This entry has some delay
481        FlushRegionEntry fqe =
482            new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY);
483        fqe.requeue(delay);
484        this.regionsInQueue.put(r, fqe);
485        this.flushQueue.add(fqe);
486        r.incrementFlushesQueuedCount();
487        return true;
488      }
489      return false;
490    }
491  }
492
493  public int getFlushQueueSize() {
494    return flushQueue.size();
495  }
496
497  /**
498   * Only interrupt once it's done with a run through the work loop.
499   */
500  void interruptIfNecessary() {
501    lock.writeLock().lock();
502    try {
503      for (FlushHandler flushHander : flushHandlers) {
504        if (flushHander != null) flushHander.interrupt();
505      }
506    } finally {
507      lock.writeLock().unlock();
508    }
509  }
510
511  synchronized void start(UncaughtExceptionHandler eh) {
512    ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
513        server.getServerName().toShortString() + "-MemStoreFlusher", eh);
514    for (int i = 0; i < flushHandlers.length; i++) {
515      flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
516      flusherThreadFactory.newThread(flushHandlers[i]);
517      flushHandlers[i].start();
518    }
519  }
520
521  boolean isAlive() {
522    for (FlushHandler flushHander : flushHandlers) {
523      if (flushHander != null && flushHander.isAlive()) {
524        return true;
525      }
526    }
527    return false;
528  }
529
530  void join() {
531    for (FlushHandler flushHander : flushHandlers) {
532      if (flushHander != null) {
533        Threads.shutdown(flushHander.getThread());
534      }
535    }
536  }
537
538  /**
539   * A flushRegion that checks store file count.  If too many, puts the flush
540   * on delay queue to retry later.
541   * @param fqe
542   * @return true if the region was successfully flushed, false otherwise. If
543   * false, there will be accompanying log messages explaining why the region was
544   * not flushed.
545   */
546  private boolean flushRegion(final FlushRegionEntry fqe) {
547    HRegion region = fqe.region;
548    if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) {
549      if (fqe.isMaximumWait(this.blockingWaitTime)) {
550        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
551          "ms on a compaction to clean up 'too many store files'; waited " +
552          "long enough... proceeding with flush of " +
553          region.getRegionInfo().getRegionNameAsString());
554      } else {
555        // If this is first time we've been put off, then emit a log message.
556        if (fqe.getRequeueCount() <= 0) {
557          // Note: We don't impose blockingStoreFiles constraint on meta regions
558          LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
559              region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
560              this.blockingWaitTime);
561          if (!this.server.compactSplitThread.requestSplit(region)) {
562            try {
563              this.server.compactSplitThread.requestSystemCompaction(region,
564                Thread.currentThread().getName());
565            } catch (IOException e) {
566              e = e instanceof RemoteException ?
567                      ((RemoteException)e).unwrapRemoteException() : e;
568              LOG.error("Cache flush failed for region " +
569                Bytes.toStringBinary(region.getRegionInfo().getRegionName()), e);
570            }
571          }
572        }
573
574        // Put back on the queue.  Have it come back out of the queue
575        // after a delay of this.blockingWaitTime / 100 ms.
576        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
577        // Tell a lie, it's not flushed but it's ok
578        return true;
579      }
580    }
581    return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker());
582  }
583
584  /**
585   * Flush a region.
586   * @param region Region to flush.
587   * @param emergencyFlush Set if we are being force flushed. If true the region
588   * needs to be removed from the flush queue. If false, when we were called
589   * from the main flusher run loop and we got the entry to flush by calling
590   * poll on the flush queue (which removed it).
591   * @param forceFlushAllStores whether we want to flush all store.
592   * @return true if the region was successfully flushed, false otherwise. If
593   * false, there will be accompanying log messages explaining why the region was
594   * not flushed.
595   */
596  private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores,
597      FlushLifeCycleTracker tracker) {
598    synchronized (this.regionsInQueue) {
599      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
600      // Use the start time of the FlushRegionEntry if available
601      if (fqe != null && emergencyFlush) {
602        // Need to remove from region from delay queue. When NOT an
603        // emergencyFlush, then item was removed via a flushQueue.poll.
604        flushQueue.remove(fqe);
605      }
606    }
607
608    tracker.beforeExecution();
609    lock.readLock().lock();
610    try {
611      notifyFlushRequest(region, emergencyFlush);
612      FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker);
613      boolean shouldCompact = flushResult.isCompactionNeeded();
614      // We just want to check the size
615      boolean shouldSplit = region.checkSplit() != null;
616      if (shouldSplit) {
617        this.server.compactSplitThread.requestSplit(region);
618      } else if (shouldCompact) {
619        server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
620      }
621    } catch (DroppedSnapshotException ex) {
622      // Cache flush can fail in a few places. If it fails in a critical
623      // section, we get a DroppedSnapshotException and a replay of wal
624      // is required. Currently the only way to do this is a restart of
625      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
626      // where hdfs was bad but passed the hdfs check).
627      server.abort("Replay of WAL required. Forcing server shutdown", ex);
628      return false;
629    } catch (IOException ex) {
630      ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
631      LOG.error(
632        "Cache flush failed"
633            + (region != null ? (" for region " +
634                Bytes.toStringBinary(region.getRegionInfo().getRegionName()))
635              : ""), ex);
636      if (!server.checkFileSystem()) {
637        return false;
638      }
639    } finally {
640      lock.readLock().unlock();
641      wakeUpIfBlocking();
642      tracker.afterExecution();
643    }
644    return true;
645  }
646
647  private void notifyFlushRequest(Region region, boolean emergencyFlush) {
648    FlushType type = null;
649    if (emergencyFlush) {
650      type = isAboveHighWaterMark();
651      if (type == null) {
652        type = isAboveLowWaterMark();
653      }
654    }
655    for (FlushRequestListener listener : flushRequestListeners) {
656      listener.flushRequested(type, region);
657    }
658  }
659
660  private void wakeUpIfBlocking() {
661    synchronized (blockSignal) {
662      blockSignal.notifyAll();
663    }
664  }
665
666  private boolean isTooManyStoreFiles(Region region) {
667
668    // When compaction is disabled, the region is flushable
669    if (!region.getTableDescriptor().isCompactionEnabled()) {
670      return false;
671    }
672
673    for (Store store : region.getStores()) {
674      if (store.hasTooManyStoreFiles()) {
675        return true;
676      }
677    }
678    return false;
679  }
680
681  private int getStoreFileCount(Region region) {
682    int count = 0;
683    for (Store store : region.getStores()) {
684      count += store.getStorefilesCount();
685    }
686    return count;
687  }
688
689  /**
690   * Check if the regionserver's memstore memory usage is greater than the
691   * limit. If so, flush regions with the biggest memstores until we're down
692   * to the lower limit. This method blocks callers until we're down to a safe
693   * amount of memstore consumption.
694   */
695  public void reclaimMemStoreMemory() {
696    try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) {
697      FlushType flushType = isAboveHighWaterMark();
698      if (flushType != FlushType.NORMAL) {
699        TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
700        long start = EnvironmentEdgeManager.currentTime();
701        long nextLogTimeMs = start;
702        synchronized (this.blockSignal) {
703          boolean blocked = false;
704          long startTime = 0;
705          boolean interrupted = false;
706          try {
707            flushType = isAboveHighWaterMark();
708            while (flushType != FlushType.NORMAL && !server.isStopped()) {
709              server.cacheFlusher.setFlushType(flushType);
710              if (!blocked) {
711                startTime = EnvironmentEdgeManager.currentTime();
712                if (!server.getRegionServerAccounting().isOffheap()) {
713                  logMsg("global memstore heapsize",
714                      server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
715                      server.getRegionServerAccounting().getGlobalMemStoreLimit());
716                } else {
717                  switch (flushType) {
718                    case ABOVE_OFFHEAP_HIGHER_MARK:
719                      logMsg("the global offheap memstore datasize",
720                          server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
721                          server.getRegionServerAccounting().getGlobalMemStoreLimit());
722                      break;
723                    case ABOVE_ONHEAP_HIGHER_MARK:
724                      logMsg("global memstore heapsize",
725                          server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
726                          server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
727                      break;
728                    default:
729                      break;
730                  }
731                }
732              }
733              blocked = true;
734              wakeupFlushThread();
735              try {
736                // we should be able to wait forever, but we've seen a bug where
737                // we miss a notify, so put a 5 second bound on it at least.
738                blockSignal.wait(5 * 1000);
739              } catch (InterruptedException ie) {
740                LOG.warn("Interrupted while waiting");
741                interrupted = true;
742              }
743              long nowMs = EnvironmentEdgeManager.currentTime();
744              if (nowMs >= nextLogTimeMs) {
745                LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
746                nextLogTimeMs = nowMs + 1000;
747              }
748              flushType = isAboveHighWaterMark();
749            }
750          } finally {
751            if (interrupted) {
752              Thread.currentThread().interrupt();
753            }
754          }
755
756          if(blocked){
757            final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
758            if(totalTime > 0){
759              this.updatesBlockedMsHighWater.add(totalTime);
760            }
761            LOG.info("Unblocking updates for server " + server.toString());
762          }
763        }
764      } else {
765        flushType = isAboveLowWaterMark();
766        if (flushType != FlushType.NORMAL) {
767          server.cacheFlusher.setFlushType(flushType);
768          wakeupFlushThread();
769        }
770      }
771    }
772  }
773
774  private void logMsg(String type, long val, long max) {
775    LOG.info("Blocking updates: {} {} is >= blocking {}", type,
776        TraditionalBinaryPrefix.long2String(val, "", 1),
777        TraditionalBinaryPrefix.long2String(max, "", 1));
778  }
779
780  @Override
781  public String toString() {
782    return "flush_queue="
783        + flushQueue.size();
784  }
785
786  public String dumpQueue() {
787    StringBuilder queueList = new StringBuilder();
788    queueList.append("Flush Queue Queue dump:\n");
789    queueList.append("  Flush Queue:\n");
790    java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
791
792    while(it.hasNext()){
793      queueList.append("    "+it.next().toString());
794      queueList.append("\n");
795    }
796
797    return queueList.toString();
798  }
799
800  /**
801   * Register a MemstoreFlushListener
802   * @param listener
803   */
804  @Override
805  public void registerFlushRequestListener(final FlushRequestListener listener) {
806    this.flushRequestListeners.add(listener);
807  }
808
809  /**
810   * Unregister the listener from MemstoreFlushListeners
811   * @param listener
812   * @return true when passed listener is unregistered successfully.
813   */
814  @Override
815  public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
816    return this.flushRequestListeners.remove(listener);
817  }
818
819  /**
820   * Sets the global memstore limit to a new size.
821   * @param globalMemStoreSize
822   */
823  @Override
824  public void setGlobalMemStoreLimit(long globalMemStoreSize) {
825    this.server.getRegionServerAccounting().setGlobalMemStoreLimits(globalMemStoreSize);
826    reclaimMemStoreMemory();
827  }
828
829  interface FlushQueueEntry extends Delayed {
830  }
831
832  /**
833   * Datastructure used in the flush queue.  Holds region and retry count.
834   * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
835   * construction, the delay is zero. When added to a delay queue, we'll come
836   * out near immediately.  Call {@link #requeue(long)} passing delay in
837   * milliseconds before readding to delay queue if you want it to stay there
838   * a while.
839   */
840  static class FlushRegionEntry implements FlushQueueEntry {
841    private final HRegion region;
842
843    private final long createTime;
844    private long whenToExpire;
845    private int requeueCount = 0;
846
847    private final boolean forceFlushAllStores;
848
849    private final FlushLifeCycleTracker tracker;
850
851    FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
852      this.region = r;
853      this.createTime = EnvironmentEdgeManager.currentTime();
854      this.whenToExpire = this.createTime;
855      this.forceFlushAllStores = forceFlushAllStores;
856      this.tracker = tracker;
857    }
858
859    /**
860     * @param maximumWait
861     * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
862     */
863    public boolean isMaximumWait(final long maximumWait) {
864      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
865    }
866
867    /**
868     * @return Count of times {@link #requeue(long)} was called; i.e this is
869     * number of times we've been requeued.
870     */
871    public int getRequeueCount() {
872      return this.requeueCount;
873    }
874
875    /**
876     * @return whether we need to flush all stores.
877     */
878    public boolean isForceFlushAllStores() {
879      return forceFlushAllStores;
880    }
881
882    public FlushLifeCycleTracker getTracker() {
883      return tracker;
884    }
885
886    /**
887     * @param when When to expire, when to come up out of the queue.
888     * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
889     * to whatever you pass.
890     * @return This.
891     */
892    public FlushRegionEntry requeue(final long when) {
893      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
894      this.requeueCount++;
895      return this;
896    }
897
898    @Override
899    public long getDelay(TimeUnit unit) {
900      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
901          TimeUnit.MILLISECONDS);
902    }
903
904    @Override
905    public int compareTo(Delayed other) {
906      // Delay is compared first. If there is a tie, compare region's hash code
907      int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
908        other.getDelay(TimeUnit.MILLISECONDS)).intValue();
909      if (ret != 0) {
910        return ret;
911      }
912      FlushQueueEntry otherEntry = (FlushQueueEntry) other;
913      return hashCode() - otherEntry.hashCode();
914    }
915
916    @Override
917    public String toString() {
918      return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]";
919    }
920
921    @Override
922    public int hashCode() {
923      int hash = (int) getDelay(TimeUnit.MILLISECONDS);
924      return hash ^ region.hashCode();
925    }
926
927   @Override
928    public boolean equals(Object obj) {
929      if (this == obj) {
930        return true;
931      }
932      if (obj == null || getClass() != obj.getClass()) {
933        return false;
934      }
935      FlushRegionEntry other = (FlushRegionEntry) obj;
936      if (!Bytes.equals(this.region.getRegionInfo().getRegionName(),
937          other.region.getRegionInfo().getRegionName())) {
938        return false;
939      }
940      return compareTo(other) == 0;
941    }
942  }
943}