View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.util.StringUtils.humanReadableInt;
22  
23  import java.io.IOException;
24  import java.lang.Thread.UncaughtExceptionHandler;
25  import java.lang.management.ManagementFactory;
26  import java.util.ArrayList;
27  import java.util.ConcurrentModificationException;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.SortedMap;
34  import java.util.concurrent.BlockingQueue;
35  import java.util.concurrent.DelayQueue;
36  import java.util.concurrent.Delayed;
37  import java.util.concurrent.ThreadFactory;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.locks.ReentrantReadWriteLock;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.DroppedSnapshotException;
46  import org.apache.hadoop.hbase.HConstants;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
49  import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.Counter;
52  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
53  import org.apache.hadoop.hbase.util.HasThread;
54  import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
55  import org.apache.hadoop.hbase.util.Threads;
56  import org.apache.hadoop.ipc.RemoteException;
57  import org.apache.hadoop.util.StringUtils;
58  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
59  import org.apache.htrace.Trace;
60  import org.apache.htrace.TraceScope;
61  
62  import com.google.common.base.Preconditions;
63  
64  /**
65   * Thread that flushes cache on request
66   *
67   * NOTE: This class extends Thread rather than Chore because the sleep time
68   * can be interrupted when there is something to do, rather than the Chore
69   * sleep time which is invariant.
70   *
71   * @see FlushRequester
72   */
73  @InterfaceAudience.Private
74  class MemStoreFlusher implements FlushRequester {
75    static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
76  
77    private Configuration conf;
78    // These two data members go together.  Any entry in the one must have
79    // a corresponding entry in the other.
80    private final BlockingQueue<FlushQueueEntry> flushQueue =
81      new DelayQueue<FlushQueueEntry>();
82    private final Map<HRegion, FlushRegionEntry> regionsInQueue =
83      new HashMap<HRegion, FlushRegionEntry>();
84    private AtomicBoolean wakeupPending = new AtomicBoolean();
85  
86    private final long threadWakeFrequency;
87    private final HRegionServer server;
88    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
89    private final Object blockSignal = new Object();
90  
91    protected long globalMemStoreLimit;
92    protected float globalMemStoreLimitLowMarkPercent;
93    protected long globalMemStoreLimitLowMark;
94  
95    private long blockingWaitTime;
96    private final Counter updatesBlockedMsHighWater = new Counter();
97  
98    private final FlushHandler[] flushHandlers;
99    private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);
100 
101   /**
102    * @param conf
103    * @param server
104    */
105   public MemStoreFlusher(final Configuration conf,
106       final HRegionServer server) {
107     super();
108     this.conf = conf;
109     this.server = server;
110     this.threadWakeFrequency =
111       conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
112     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
113     float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
114     this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
115     this.globalMemStoreLimitLowMarkPercent =
116         HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
117     this.globalMemStoreLimitLowMark =
118         (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
119 
120     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
121       90000);
122     int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
123     this.flushHandlers = new FlushHandler[handlerCount];
124     LOG.info("globalMemStoreLimit="
125         + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
126         + ", globalMemStoreLimitLowMark="
127         + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
128         + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
129   }
130 
131   public Counter getUpdatesBlockedMsHighWater() {
132     return this.updatesBlockedMsHighWater;
133   }
134 
135   /**
136    * The memstore across all regions has exceeded the low water mark. Pick
137    * one region to flush and flush it synchronously (this is called from the
138    * flush thread)
139    * @return true if successful
140    */
141   private boolean flushOneForGlobalPressure() {
142     SortedMap<Long, HRegion> regionsBySize =
143         server.getCopyOfOnlineRegionsSortedBySize();
144 
145     Set<HRegion> excludedRegions = new HashSet<HRegion>();
146 
147     double secondaryMultiplier
148       = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
149 
150     boolean flushedOne = false;
151     while (!flushedOne) {
152       // Find the biggest region that doesn't have too many storefiles
153       // (might be null!)
154       HRegion bestFlushableRegion = getBiggestMemstoreRegion(
155           regionsBySize, excludedRegions, true);
156       // Find the biggest region, total, even if it might have too many flushes.
157       HRegion bestAnyRegion = getBiggestMemstoreRegion(
158           regionsBySize, excludedRegions, false);
159       // Find the biggest region that is a secondary region
160       HRegion bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize,
161         excludedRegions);
162 
163       if (bestAnyRegion == null && bestRegionReplica == null) {
164         LOG.error("Above memory mark but there are no flushable regions!");
165         return false;
166       }
167 
168       HRegion regionToFlush;
169       if (bestFlushableRegion != null &&
170           bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
171         // Even if it's not supposed to be flushed, pick a region if it's more than twice
172         // as big as the best flushable one - otherwise when we're under pressure we make
173         // lots of little flushes and cause lots of compactions, etc, which just makes
174         // life worse!
175         if (LOG.isDebugEnabled()) {
176           LOG.debug("Under global heap pressure: " + "Region "
177               + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is "
178               + TraditionalBinaryPrefix.long2String(bestAnyRegion.memstoreSize.get(), "", 1)
179               + " vs best flushable region's "
180               + TraditionalBinaryPrefix.long2String(bestFlushableRegion.memstoreSize.get(), "", 1)
181               + ". Choosing the bigger.");
182         }
183         regionToFlush = bestAnyRegion;
184       } else {
185         if (bestFlushableRegion == null) {
186           regionToFlush = bestAnyRegion;
187         } else {
188           regionToFlush = bestFlushableRegion;
189         }
190       }
191 
192       Preconditions.checkState(
193         (regionToFlush != null && regionToFlush.memstoreSize.get() > 0) ||
194         (bestRegionReplica != null && bestRegionReplica.memstoreSize.get() > 0));
195 
196       if (regionToFlush == null ||
197           (bestRegionReplica != null &&
198            ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
199            (bestRegionReplica.memstoreSize.get()
200                > secondaryMultiplier * regionToFlush.memstoreSize.get()))) {
201         LOG.info("Refreshing storefiles of region " + regionToFlush +
202           " due to global heap pressure. memstore size=" + StringUtils.humanReadableInt(
203             server.getRegionServerAccounting().getGlobalMemstoreSize()));
204         flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
205         if (!flushedOne) {
206           LOG.info("Excluding secondary region " + regionToFlush +
207               " - trying to find a different region to refresh files.");
208           excludedRegions.add(bestRegionReplica);
209         }
210       } else {
211         LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
212             + "Total Memstore size="
213             + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
214             + ", Region memstore size="
215             + humanReadableInt(regionToFlush.memstoreSize.get()));
216         flushedOne = flushRegion(regionToFlush, true, true);
217 
218         if (!flushedOne) {
219           LOG.info("Excluding unflushable region " + regionToFlush +
220               " - trying to find a different region to flush.");
221           excludedRegions.add(regionToFlush);
222         }
223       }
224     }
225     return true;
226   }
227 
228   private class FlushHandler extends HasThread {
229 
230     private FlushHandler(String name) {
231       super(name);
232     }
233 
234     @Override
235     public void run() {
236       while (!server.isStopped()) {
237         FlushQueueEntry fqe = null;
238         try {
239           wakeupPending.set(false); // allow someone to wake us up again
240           fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
241           if (fqe == null || fqe instanceof WakeupFlushThread) {
242             if (isAboveLowWaterMark()) {
243               LOG.debug("Flush thread woke up because memory above low water="
244                   + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
245               if (!flushOneForGlobalPressure()) {
246                 // Wasn't able to flush any region, but we're above low water mark
247                 // This is unlikely to happen, but might happen when closing the
248                 // entire server - another thread is flushing regions. We'll just
249                 // sleep a little bit to avoid spinning, and then pretend that
250                 // we flushed one, so anyone blocked will check again
251                 Thread.sleep(1000);
252                 wakeUpIfBlocking();
253               }
254               // Enqueue another one of these tokens so we'll wake up again
255               wakeupFlushThread();
256             }
257             continue;
258           }
259           FlushRegionEntry fre = (FlushRegionEntry) fqe;
260           if (!flushRegion(fre)) {
261             break;
262           }
263         } catch (InterruptedException ex) {
264           continue;
265         } catch (ConcurrentModificationException ex) {
266           continue;
267         } catch (Exception ex) {
268           LOG.error("Cache flusher failed for entry " + fqe, ex);
269           if (!server.checkFileSystem()) {
270             break;
271           }
272         }
273       }
274       synchronized (regionsInQueue) {
275         regionsInQueue.clear();
276         flushQueue.clear();
277       }
278 
279       // Signal anyone waiting, so they see the close flag
280       wakeUpIfBlocking();
281       LOG.info(getName() + " exiting");
282     }
283   }
284 
285 
286   private void wakeupFlushThread() {
287     if (wakeupPending.compareAndSet(false, true)) {
288       flushQueue.add(new WakeupFlushThread());
289     }
290   }
291 
292   private HRegion getBiggestMemstoreRegion(
293       SortedMap<Long, HRegion> regionsBySize,
294       Set<HRegion> excludedRegions,
295       boolean checkStoreFileCount) {
296     synchronized (regionsInQueue) {
297       for (HRegion region : regionsBySize.values()) {
298         if (excludedRegions.contains(region)) {
299           continue;
300         }
301 
302         if (region.writestate.flushing || !region.writestate.writesEnabled) {
303           continue;
304         }
305 
306         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
307           continue;
308         }
309         return region;
310       }
311     }
312     return null;
313   }
314 
315   private HRegion getBiggestMemstoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize,
316       Set<HRegion> excludedRegions) {
317     synchronized (regionsInQueue) {
318       for (HRegion region : regionsBySize.values()) {
319         if (excludedRegions.contains(region)) {
320           continue;
321         }
322 
323         if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
324           continue;
325         }
326 
327         return region;
328       }
329     }
330     return null;
331   }
332 
333   private boolean refreshStoreFilesAndReclaimMemory(HRegion region) {
334     try {
335       return region.refreshStoreFiles();
336     } catch (IOException e) {
337       LOG.warn("Refreshing store files failed with exception", e);
338     }
339     return false;
340   }
341 
342   /**
343    * Return true if global memory usage is above the high watermark
344    */
345   private boolean isAboveHighWaterMark() {
346     return server.getRegionServerAccounting().
347       getGlobalMemstoreSize() >= globalMemStoreLimit;
348   }
349 
350   /**
351    * Return true if we're above the high watermark
352    */
353   private boolean isAboveLowWaterMark() {
354     return server.getRegionServerAccounting().
355       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
356   }
357 
358   @Override
359   public void requestFlush(HRegion r, boolean forceFlushAllStores) {
360     synchronized (regionsInQueue) {
361       if (!regionsInQueue.containsKey(r)) {
362         // This entry has no delay so it will be added at the top of the flush
363         // queue.  It'll come out near immediately.
364         FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
365         this.regionsInQueue.put(r, fqe);
366         this.flushQueue.add(fqe);
367       }
368     }
369   }
370 
371   @Override
372   public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
373     synchronized (regionsInQueue) {
374       if (!regionsInQueue.containsKey(r)) {
375         // This entry has some delay
376         FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
377         fqe.requeue(delay);
378         this.regionsInQueue.put(r, fqe);
379         this.flushQueue.add(fqe);
380       }
381     }
382   }
383 
384   public int getFlushQueueSize() {
385     return flushQueue.size();
386   }
387 
388   /**
389    * Only interrupt once it's done with a run through the work loop.
390    */
391   void interruptIfNecessary() {
392     lock.writeLock().lock();
393     try {
394       for (FlushHandler flushHander : flushHandlers) {
395         if (flushHander != null) flushHander.interrupt();
396       }
397     } finally {
398       lock.writeLock().unlock();
399     }
400   }
401 
402   synchronized void start(UncaughtExceptionHandler eh) {
403     ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
404         server.getServerName().toShortString() + "-MemStoreFlusher", eh);
405     for (int i = 0; i < flushHandlers.length; i++) {
406       flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
407       flusherThreadFactory.newThread(flushHandlers[i]);
408       flushHandlers[i].start();
409     }
410   }
411 
412   boolean isAlive() {
413     for (FlushHandler flushHander : flushHandlers) {
414       if (flushHander != null && flushHander.isAlive()) {
415         return true;
416       }
417     }
418     return false;
419   }
420 
421   void join() {
422     for (FlushHandler flushHander : flushHandlers) {
423       if (flushHander != null) {
424         Threads.shutdown(flushHander.getThread());
425       }
426     }
427   }
428 
429   /**
430    * A flushRegion that checks store file count.  If too many, puts the flush
431    * on delay queue to retry later.
432    * @param fqe
433    * @return true if the region was successfully flushed, false otherwise. If
434    * false, there will be accompanying log messages explaining why the log was
435    * not flushed.
436    */
437   private boolean flushRegion(final FlushRegionEntry fqe) {
438     HRegion region = fqe.region;
439     if (!region.getRegionInfo().isMetaRegion() &&
440         isTooManyStoreFiles(region)) {
441       if (fqe.isMaximumWait(this.blockingWaitTime)) {
442         LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
443           "ms on a compaction to clean up 'too many store files'; waited " +
444           "long enough... proceeding with flush of " +
445           region.getRegionNameAsString());
446       } else {
447         // If this is first time we've been put off, then emit a log message.
448         if (fqe.getRequeueCount() <= 0) {
449           // Note: We don't impose blockingStoreFiles constraint on meta regions
450           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
451             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
452           if (!this.server.compactSplitThread.requestSplit(region)) {
453             try {
454               this.server.compactSplitThread.requestSystemCompaction(
455                   region, Thread.currentThread().getName());
456             } catch (IOException e) {
457               e = e instanceof RemoteException ?
458                       ((RemoteException)e).unwrapRemoteException() : e;
459               LOG.error(
460                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
461                 e);
462             }
463           }
464         }
465 
466         // Put back on the queue.  Have it come back out of the queue
467         // after a delay of this.blockingWaitTime / 100 ms.
468         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
469         // Tell a lie, it's not flushed but it's ok
470         return true;
471       }
472     }
473     return flushRegion(region, false, fqe.isForceFlushAllStores());
474   }
475 
476   /**
477    * Flush a region.
478    * @param region Region to flush.
479    * @param emergencyFlush Set if we are being force flushed. If true the region
480    * needs to be removed from the flush queue. If false, when we were called
481    * from the main flusher run loop and we got the entry to flush by calling
482    * poll on the flush queue (which removed it).
483    * @param forceFlushAllStores whether we want to flush all store.
484    * @return true if the region was successfully flushed, false otherwise. If
485    * false, there will be accompanying log messages explaining why the log was
486    * not flushed.
487    */
488   private boolean flushRegion(final HRegion region, final boolean emergencyFlush,
489       boolean forceFlushAllStores) {
490     long startTime = 0;
491     synchronized (this.regionsInQueue) {
492       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
493       // Use the start time of the FlushRegionEntry if available
494       if (fqe != null) {
495         startTime = fqe.createTime;
496       }
497       if (fqe != null && emergencyFlush) {
498         // Need to remove from region from delay queue.  When NOT an
499         // emergencyFlush, then item was removed via a flushQueue.poll.
500         flushQueue.remove(fqe);
501      }
502     }
503     if (startTime == 0) {
504       // Avoid getting the system time unless we don't have a FlushRegionEntry;
505       // shame we can't capture the time also spent in the above synchronized
506       // block
507       startTime = EnvironmentEdgeManager.currentTime();
508     }
509     lock.readLock().lock();
510     try {
511       notifyFlushRequest(region, emergencyFlush);
512       HRegion.FlushResult flushResult = region.flushcache(forceFlushAllStores);
513       boolean shouldCompact = flushResult.isCompactionNeeded();
514       // We just want to check the size
515       boolean shouldSplit = region.checkSplit() != null;
516       if (shouldSplit) {
517         this.server.compactSplitThread.requestSplit(region);
518       } else if (shouldCompact) {
519         server.compactSplitThread.requestSystemCompaction(
520             region, Thread.currentThread().getName());
521       }
522       if (flushResult.isFlushSucceeded()) {
523         long endTime = EnvironmentEdgeManager.currentTime();
524         server.metricsRegionServer.updateFlushTime(endTime - startTime);
525       }
526     } catch (DroppedSnapshotException ex) {
527       // Cache flush can fail in a few places. If it fails in a critical
528       // section, we get a DroppedSnapshotException and a replay of wal
529       // is required. Currently the only way to do this is a restart of
530       // the server. Abort because hdfs is probably bad (HBASE-644 is a case
531       // where hdfs was bad but passed the hdfs check).
532       server.abort("Replay of WAL required. Forcing server shutdown", ex);
533       return false;
534     } catch (IOException ex) {
535       ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
536       LOG.error(
537         "Cache flush failed"
538             + (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName()))
539                 : ""), ex);
540       if (!server.checkFileSystem()) {
541         return false;
542       }
543     } finally {
544       lock.readLock().unlock();
545       wakeUpIfBlocking();
546     }
547     return true;
548   }
549 
550   private void notifyFlushRequest(HRegion region, boolean emergencyFlush) {
551     FlushType type = FlushType.NORMAL;
552     if (emergencyFlush) {
553       type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
554     }
555     for (FlushRequestListener listener : flushRequestListeners) {
556       listener.flushRequested(type, region);
557     }
558   }
559 
560   private void wakeUpIfBlocking() {
561     synchronized (blockSignal) {
562       blockSignal.notifyAll();
563     }
564   }
565 
566   private boolean isTooManyStoreFiles(HRegion region) {
567     for (Store store : region.stores.values()) {
568       if (store.hasTooManyStoreFiles()) {
569         return true;
570       }
571     }
572     return false;
573   }
574 
575   /**
576    * Check if the regionserver's memstore memory usage is greater than the
577    * limit. If so, flush regions with the biggest memstores until we're down
578    * to the lower limit. This method blocks callers until we're down to a safe
579    * amount of memstore consumption.
580    */
581   public void reclaimMemStoreMemory() {
582     TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
583     if (isAboveHighWaterMark()) {
584       if (Trace.isTracing()) {
585         scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
586       }
587       long start = EnvironmentEdgeManager.currentTime();
588       synchronized (this.blockSignal) {
589         boolean blocked = false;
590         long startTime = 0;
591         boolean interrupted = false;
592         try {
593           while (isAboveHighWaterMark() && !server.isStopped()) {
594             if (!blocked) {
595               startTime = EnvironmentEdgeManager.currentTime();
596               LOG.info("Blocking updates on "
597                   + server.toString()
598                   + ": the global memstore size "
599                   + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
600                       .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
601                   + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
602             }
603             blocked = true;
604             wakeupFlushThread();
605             try {
606               // we should be able to wait forever, but we've seen a bug where
607               // we miss a notify, so put a 5 second bound on it at least.
608               blockSignal.wait(5 * 1000);
609             } catch (InterruptedException ie) {
610               LOG.warn("Interrupted while waiting");
611               interrupted = true;
612             }
613             long took = EnvironmentEdgeManager.currentTime() - start;
614             LOG.warn("Memstore is above high water mark and block " + took + "ms");
615           }
616         } finally {
617           if (interrupted) {
618             Thread.currentThread().interrupt();
619           }
620         }
621 
622         if(blocked){
623           final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
624           if(totalTime > 0){
625             this.updatesBlockedMsHighWater.add(totalTime);
626           }
627           LOG.info("Unblocking updates for server " + server.toString());
628         }
629       }
630     } else if (isAboveLowWaterMark()) {
631       wakeupFlushThread();
632     }
633     scope.close();
634   }
635   @Override
636   public String toString() {
637     return "flush_queue="
638         + flushQueue.size();
639   }
640 
641   public String dumpQueue() {
642     StringBuilder queueList = new StringBuilder();
643     queueList.append("Flush Queue Queue dump:\n");
644     queueList.append("  Flush Queue:\n");
645     java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
646 
647     while(it.hasNext()){
648       queueList.append("    "+it.next().toString());
649       queueList.append("\n");
650     }
651 
652     return queueList.toString();
653   }
654 
655   /**
656    * Register a MemstoreFlushListener
657    * @param listener
658    */
659   @Override
660   public void registerFlushRequestListener(final FlushRequestListener listener) {
661     this.flushRequestListeners.add(listener);
662   }
663 
664   /**
665    * Unregister the listener from MemstoreFlushListeners
666    * @param listener
667    * @return true when passed listener is unregistered successfully.
668    */
669   @Override
670   public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
671     return this.flushRequestListeners.remove(listener);
672   }
673 
674   /**
675    * Sets the global memstore limit to a new size.
676    * @param globalMemStoreSize
677    */
678   @Override
679   public void setGlobalMemstoreLimit(long globalMemStoreSize) {
680     this.globalMemStoreLimit = globalMemStoreSize;
681     this.globalMemStoreLimitLowMark =
682         (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
683     reclaimMemStoreMemory();
684   }
685 
686   public long getMemoryLimit() {
687     return this.globalMemStoreLimit;
688   }
689 
690   interface FlushQueueEntry extends Delayed {
691   }
692 
693   /**
694    * Token to insert into the flush queue that ensures that the flusher does not sleep
695    */
696   static class WakeupFlushThread implements FlushQueueEntry {
697     @Override
698     public long getDelay(TimeUnit unit) {
699       return 0;
700     }
701 
702     @Override
703     public int compareTo(Delayed o) {
704       return -1;
705     }
706 
707     @Override
708     public boolean equals(Object obj) {
709       return (this == obj);
710     }
711   }
712 
713   /**
714    * Datastructure used in the flush queue.  Holds region and retry count.
715    * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
716    * construction, the delay is zero. When added to a delay queue, we'll come
717    * out near immediately.  Call {@link #requeue(long)} passing delay in
718    * milliseconds before readding to delay queue if you want it to stay there
719    * a while.
720    */
721   static class FlushRegionEntry implements FlushQueueEntry {
722     private final HRegion region;
723 
724     private final long createTime;
725     private long whenToExpire;
726     private int requeueCount = 0;
727 
728     private boolean forceFlushAllStores;
729 
730     FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) {
731       this.region = r;
732       this.createTime = EnvironmentEdgeManager.currentTime();
733       this.whenToExpire = this.createTime;
734       this.forceFlushAllStores = forceFlushAllStores;
735     }
736 
737     /**
738      * @param maximumWait
739      * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
740      */
741     public boolean isMaximumWait(final long maximumWait) {
742       return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
743     }
744 
745     /**
746      * @return Count of times {@link #requeue(long)} was called; i.e this is
747      * number of times we've been requeued.
748      */
749     public int getRequeueCount() {
750       return this.requeueCount;
751     }
752 
753     /**
754      * @return whether we need to flush all stores.
755      */
756     public boolean isForceFlushAllStores() {
757       return forceFlushAllStores;
758     }
759 
760     /**
761      * @param when When to expire, when to come up out of the queue.
762      * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
763      * to whatever you pass.
764      * @return This.
765      */
766     public FlushRegionEntry requeue(final long when) {
767       this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
768       this.requeueCount++;
769       return this;
770     }
771 
772     @Override
773     public long getDelay(TimeUnit unit) {
774       return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
775           TimeUnit.MILLISECONDS);
776     }
777 
778     @Override
779     public int compareTo(Delayed other) {
780       // Delay is compared first. If there is a tie, compare region's hash code
781       int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
782         other.getDelay(TimeUnit.MILLISECONDS)).intValue();
783       if (ret != 0) {
784         return ret;
785       }
786       FlushQueueEntry otherEntry = (FlushQueueEntry) other;
787       return hashCode() - otherEntry.hashCode();
788     }
789 
790     @Override
791     public String toString() {
792       return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
793     }
794 
795     @Override
796     public int hashCode() {
797       int hash = (int) getDelay(TimeUnit.MILLISECONDS);
798       return hash ^ region.hashCode();
799     }
800 
801    @Override
802     public boolean equals(Object obj) {
803       if (this == obj) {
804         return true;
805       }
806       if (obj == null || getClass() != obj.getClass()) {
807         return false;
808       }
809       Delayed other = (Delayed) obj;
810       return compareTo(other) == 0;
811     }
812   }
813 }
814 
815 enum FlushType {
816   NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
817 }