View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.util.StringUtils.humanReadableInt;
22  
23  import java.io.IOException;
24  import java.lang.Thread.UncaughtExceptionHandler;
25  import java.lang.management.ManagementFactory;
26  import java.util.ArrayList;
27  import java.util.ConcurrentModificationException;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.SortedMap;
34  import java.util.concurrent.BlockingQueue;
35  import java.util.concurrent.DelayQueue;
36  import java.util.concurrent.Delayed;
37  import java.util.concurrent.ThreadFactory;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.locks.ReentrantReadWriteLock;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.DroppedSnapshotException;
46  import org.apache.hadoop.hbase.HConstants;
47  import org.apache.hadoop.hbase.RemoteExceptionHandler;
48  import org.apache.hadoop.hbase.classification.InterfaceAudience;
49  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
50  import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
51  import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
54  import org.apache.hadoop.hbase.util.HasThread;
55  import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
56  import org.apache.hadoop.hbase.util.Threads;
57  import org.apache.hadoop.util.StringUtils;
58  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
59  import org.apache.htrace.Trace;
60  import org.apache.htrace.TraceScope;
61  import org.apache.hadoop.hbase.util.Counter;
62  
63  import com.google.common.base.Preconditions;
64  
65  /**
66   * Thread that flushes cache on request
67   *
68   * NOTE: This class extends Thread rather than Chore because the sleep time
69   * can be interrupted when there is something to do, rather than the Chore
70   * sleep time which is invariant.
71   *
72   * @see FlushRequester
73   */
74  @InterfaceAudience.Private
75  class MemStoreFlusher implements FlushRequester {
76    static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
77  
78    private Configuration conf;
79    // These two data members go together.  Any entry in the one must have
80    // a corresponding entry in the other.
81    private final BlockingQueue<FlushQueueEntry> flushQueue =
82      new DelayQueue<FlushQueueEntry>();
83    private final Map<Region, FlushRegionEntry> regionsInQueue =
84      new HashMap<Region, FlushRegionEntry>();
85    private AtomicBoolean wakeupPending = new AtomicBoolean();
86  
87    private final long threadWakeFrequency;
88    private final HRegionServer server;
89    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
90    private final Object blockSignal = new Object();
91  
92    protected long globalMemStoreLimit;
93    protected float globalMemStoreLimitLowMarkPercent;
94    protected long globalMemStoreLimitLowMark;
95  
96    private long blockingWaitTime;
97    private final Counter updatesBlockedMsHighWater = new Counter();
98  
99    private final FlushHandler[] flushHandlers;
100   private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);
101 
102   /**
103    * @param conf
104    * @param server
105    */
106   public MemStoreFlusher(final Configuration conf,
107       final HRegionServer server) {
108     super();
109     this.conf = conf;
110     this.server = server;
111     this.threadWakeFrequency =
112       conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
113     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
114     float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
115     this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
116     this.globalMemStoreLimitLowMarkPercent =
117         HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
118     this.globalMemStoreLimitLowMark =
119         (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
120 
121     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
122       90000);
123     int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
124     this.flushHandlers = new FlushHandler[handlerCount];
125     LOG.info("globalMemStoreLimit="
126         + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
127         + ", globalMemStoreLimitLowMark="
128         + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
129         + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
130   }
131 
132   public Counter getUpdatesBlockedMsHighWater() {
133     return this.updatesBlockedMsHighWater;
134   }
135 
136   /**
137    * The memstore across all regions has exceeded the low water mark. Pick
138    * one region to flush and flush it synchronously (this is called from the
139    * flush thread)
140    * @return true if successful
141    */
142   private boolean flushOneForGlobalPressure() {
143     SortedMap<Long, Region> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize();
144     Set<Region> excludedRegions = new HashSet<Region>();
145 
146     double secondaryMultiplier
147       = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
148 
149     boolean flushedOne = false;
150     while (!flushedOne) {
151       // Find the biggest region that doesn't have too many storefiles
152       // (might be null!)
153       Region bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, excludedRegions, true);
154       // Find the biggest region, total, even if it might have too many flushes.
155       Region bestAnyRegion = getBiggestMemstoreRegion(
156           regionsBySize, excludedRegions, false);
157       // Find the biggest region that is a secondary region
158       Region bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize,
159         excludedRegions);
160 
161       if (bestAnyRegion == null && bestRegionReplica == null) {
162         LOG.error("Above memory mark but there are no flushable regions!");
163         return false;
164       }
165 
166       Region regionToFlush;
167       if (bestFlushableRegion != null &&
168           bestAnyRegion.getMemstoreSize() > 2 * bestFlushableRegion.getMemstoreSize()) {
169         // Even if it's not supposed to be flushed, pick a region if it's more than twice
170         // as big as the best flushable one - otherwise when we're under pressure we make
171         // lots of little flushes and cause lots of compactions, etc, which just makes
172         // life worse!
173         if (LOG.isDebugEnabled()) {
174           LOG.debug("Under global heap pressure: " + "Region "
175               + bestAnyRegion.getRegionInfo().getRegionNameAsString()
176               + " has too many " + "store files, but is "
177               + TraditionalBinaryPrefix.long2String(bestAnyRegion.getMemstoreSize(), "", 1)
178               + " vs best flushable region's "
179               + TraditionalBinaryPrefix.long2String(bestFlushableRegion.getMemstoreSize(), "", 1)
180               + ". Choosing the bigger.");
181         }
182         regionToFlush = bestAnyRegion;
183       } else {
184         if (bestFlushableRegion == null) {
185           regionToFlush = bestAnyRegion;
186         } else {
187           regionToFlush = bestFlushableRegion;
188         }
189       }
190 
191       Preconditions.checkState(
192         (regionToFlush != null && regionToFlush.getMemstoreSize() > 0) ||
193         (bestRegionReplica != null && bestRegionReplica.getMemstoreSize() > 0));
194 
195       if (regionToFlush == null ||
196           (bestRegionReplica != null &&
197            ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
198            (bestRegionReplica.getMemstoreSize()
199                > secondaryMultiplier * regionToFlush.getMemstoreSize()))) {
200         LOG.info("Refreshing storefiles of region " + bestRegionReplica +
201           " due to global heap pressure. memstore size=" + StringUtils.humanReadableInt(
202             server.getRegionServerAccounting().getGlobalMemstoreSize()));
203         flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
204         if (!flushedOne) {
205           LOG.info("Excluding secondary region " + bestRegionReplica +
206               " - trying to find a different region to refresh files.");
207           excludedRegions.add(bestRegionReplica);
208         }
209       } else {
210         LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
211             + "Total Memstore size="
212             + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
213             + ", Region memstore size="
214             + humanReadableInt(regionToFlush.getMemstoreSize()));
215         flushedOne = flushRegion(regionToFlush, true, true);
216 
217         if (!flushedOne) {
218           LOG.info("Excluding unflushable region " + regionToFlush +
219               " - trying to find a different region to flush.");
220           excludedRegions.add(regionToFlush);
221         }
222       }
223     }
224     return true;
225   }
226 
227   private class FlushHandler extends HasThread {
228 
229     private FlushHandler(String name) {
230       super(name);
231     }
232 
233     @Override
234     public void run() {
235       while (!server.isStopped()) {
236         FlushQueueEntry fqe = null;
237         try {
238           wakeupPending.set(false); // allow someone to wake us up again
239           fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
240           if (fqe == null || fqe instanceof WakeupFlushThread) {
241             if (isAboveLowWaterMark()) {
242               LOG.debug("Flush thread woke up because memory above low water="
243                   + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
244               if (!flushOneForGlobalPressure()) {
245                 // Wasn't able to flush any region, but we're above low water mark
246                 // This is unlikely to happen, but might happen when closing the
247                 // entire server - another thread is flushing regions. We'll just
248                 // sleep a little bit to avoid spinning, and then pretend that
249                 // we flushed one, so anyone blocked will check again
250                 Thread.sleep(1000);
251                 wakeUpIfBlocking();
252               }
253               // Enqueue another one of these tokens so we'll wake up again
254               wakeupFlushThread();
255             }
256             continue;
257           }
258           FlushRegionEntry fre = (FlushRegionEntry) fqe;
259           if (!flushRegion(fre)) {
260             break;
261           }
262         } catch (InterruptedException ex) {
263           continue;
264         } catch (ConcurrentModificationException ex) {
265           continue;
266         } catch (Exception ex) {
267           LOG.error("Cache flusher failed for entry " + fqe, ex);
268           if (!server.checkFileSystem()) {
269             break;
270           }
271         }
272       }
273       synchronized (regionsInQueue) {
274         regionsInQueue.clear();
275         flushQueue.clear();
276       }
277 
278       // Signal anyone waiting, so they see the close flag
279       wakeUpIfBlocking();
280       LOG.info(getName() + " exiting");
281     }
282   }
283 
284 
285   private void wakeupFlushThread() {
286     if (wakeupPending.compareAndSet(false, true)) {
287       flushQueue.add(new WakeupFlushThread());
288     }
289   }
290 
291   private Region getBiggestMemstoreRegion(
292       SortedMap<Long, Region> regionsBySize,
293       Set<Region> excludedRegions,
294       boolean checkStoreFileCount) {
295     synchronized (regionsInQueue) {
296       for (Region region : regionsBySize.values()) {
297         if (excludedRegions.contains(region)) {
298           continue;
299         }
300 
301         if (((HRegion)region).writestate.flushing ||
302             !((HRegion)region).writestate.writesEnabled) {
303           continue;
304         }
305 
306         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
307           continue;
308         }
309         return region;
310       }
311     }
312     return null;
313   }
314 
315   private Region getBiggestMemstoreOfRegionReplica(SortedMap<Long, Region> regionsBySize,
316       Set<Region> excludedRegions) {
317     synchronized (regionsInQueue) {
318       for (Region region : regionsBySize.values()) {
319         if (excludedRegions.contains(region)) {
320           continue;
321         }
322 
323         if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
324           continue;
325         }
326 
327         return region;
328       }
329     }
330     return null;
331   }
332 
333   private boolean refreshStoreFilesAndReclaimMemory(Region region) {
334     try {
335       return region.refreshStoreFiles();
336     } catch (IOException e) {
337       LOG.warn("Refreshing store files failed with exception", e);
338     }
339     return false;
340   }
341 
342   /**
343    * Return true if global memory usage is above the high watermark
344    */
345   private boolean isAboveHighWaterMark() {
346     return server.getRegionServerAccounting().
347       getGlobalMemstoreSize() >= globalMemStoreLimit;
348   }
349 
350   /**
351    * Return true if we're above the high watermark
352    */
353   private boolean isAboveLowWaterMark() {
354     return server.getRegionServerAccounting().
355       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
356   }
357 
358   @Override
359   public void requestFlush(Region r, boolean forceFlushAllStores) {
360     synchronized (regionsInQueue) {
361       if (!regionsInQueue.containsKey(r)) {
362         // This entry has no delay so it will be added at the top of the flush
363         // queue.  It'll come out near immediately.
364         FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
365         this.regionsInQueue.put(r, fqe);
366         this.flushQueue.add(fqe);
367       }
368     }
369   }
370 
371   @Override
372   public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) {
373     synchronized (regionsInQueue) {
374       if (!regionsInQueue.containsKey(r)) {
375         // This entry has some delay
376         FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
377         fqe.requeue(delay);
378         this.regionsInQueue.put(r, fqe);
379         this.flushQueue.add(fqe);
380       }
381     }
382   }
383 
384   public int getFlushQueueSize() {
385     return flushQueue.size();
386   }
387 
388   /**
389    * Only interrupt once it's done with a run through the work loop.
390    */
391   void interruptIfNecessary() {
392     lock.writeLock().lock();
393     try {
394       for (FlushHandler flushHander : flushHandlers) {
395         if (flushHander != null) flushHander.interrupt();
396       }
397     } finally {
398       lock.writeLock().unlock();
399     }
400   }
401 
402   synchronized void start(UncaughtExceptionHandler eh) {
403     ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
404         server.getServerName().toShortString() + "-MemStoreFlusher", eh);
405     for (int i = 0; i < flushHandlers.length; i++) {
406       flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
407       flusherThreadFactory.newThread(flushHandlers[i]);
408       flushHandlers[i].start();
409     }
410   }
411 
412   boolean isAlive() {
413     for (FlushHandler flushHander : flushHandlers) {
414       if (flushHander != null && flushHander.isAlive()) {
415         return true;
416       }
417     }
418     return false;
419   }
420 
421   void join() {
422     for (FlushHandler flushHander : flushHandlers) {
423       if (flushHander != null) {
424         Threads.shutdown(flushHander.getThread());
425       }
426     }
427   }
428 
429   /**
430    * A flushRegion that checks store file count.  If too many, puts the flush
431    * on delay queue to retry later.
432    * @param fqe
433    * @return true if the region was successfully flushed, false otherwise. If
434    * false, there will be accompanying log messages explaining why the region was
435    * not flushed.
436    */
437   private boolean flushRegion(final FlushRegionEntry fqe) {
438     Region region = fqe.region;
439     if (!region.getRegionInfo().isMetaRegion() &&
440         isTooManyStoreFiles(region)) {
441       if (fqe.isMaximumWait(this.blockingWaitTime)) {
442         LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
443           "ms on a compaction to clean up 'too many store files'; waited " +
444           "long enough... proceeding with flush of " +
445           region.getRegionInfo().getRegionNameAsString());
446       } else {
447         // If this is first time we've been put off, then emit a log message.
448         if (fqe.getRequeueCount() <= 0) {
449           // Note: We don't impose blockingStoreFiles constraint on meta regions
450           LOG.warn("Region " + region.getRegionInfo().getRegionNameAsString() + " has too many " +
451             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
452           if (!this.server.compactSplitThread.requestSplit(region)) {
453             try {
454               this.server.compactSplitThread.requestSystemCompaction(
455                   region, Thread.currentThread().getName());
456             } catch (IOException e) {
457               LOG.error("Cache flush failed for region " +
458                   Bytes.toStringBinary(region.getRegionInfo().getRegionName()),
459                 RemoteExceptionHandler.checkIOException(e));
460             }
461           }
462         }
463 
464         // Put back on the queue.  Have it come back out of the queue
465         // after a delay of this.blockingWaitTime / 100 ms.
466         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
467         // Tell a lie, it's not flushed but it's ok
468         return true;
469       }
470     }
471     return flushRegion(region, false, fqe.isForceFlushAllStores());
472   }
473 
474   /**
475    * Flush a region.
476    * @param region Region to flush.
477    * @param emergencyFlush Set if we are being force flushed. If true the region
478    * needs to be removed from the flush queue. If false, when we were called
479    * from the main flusher run loop and we got the entry to flush by calling
480    * poll on the flush queue (which removed it).
481    * @param forceFlushAllStores whether we want to flush all store.
482    * @return true if the region was successfully flushed, false otherwise. If
483    * false, there will be accompanying log messages explaining why the region was
484    * not flushed.
485    */
486   private boolean flushRegion(final Region region, final boolean emergencyFlush,
487       boolean forceFlushAllStores) {
488     long startTime = 0;
489     synchronized (this.regionsInQueue) {
490       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
491       // Use the start time of the FlushRegionEntry if available
492       if (fqe != null) {
493         startTime = fqe.createTime;
494       }
495       if (fqe != null && emergencyFlush) {
496         // Need to remove from region from delay queue.  When NOT an
497         // emergencyFlush, then item was removed via a flushQueue.poll.
498         flushQueue.remove(fqe);
499      }
500     }
501     if (startTime == 0) {
502       // Avoid getting the system time unless we don't have a FlushRegionEntry;
503       // shame we can't capture the time also spent in the above synchronized
504       // block
505       startTime = EnvironmentEdgeManager.currentTime();
506     }
507     lock.readLock().lock();
508     try {
509       notifyFlushRequest(region, emergencyFlush);
510       FlushResult flushResult = region.flush(forceFlushAllStores);
511       boolean shouldCompact = flushResult.isCompactionNeeded();
512       // We just want to check the size
513       boolean shouldSplit = ((HRegion)region).checkSplit() != null;
514       if (shouldSplit) {
515         this.server.compactSplitThread.requestSplit(region);
516       } else if (shouldCompact) {
517         server.compactSplitThread.requestSystemCompaction(
518             region, Thread.currentThread().getName());
519       }
520       if (flushResult.isFlushSucceeded()) {
521         long endTime = EnvironmentEdgeManager.currentTime();
522         server.metricsRegionServer.updateFlushTime(endTime - startTime);
523       }
524     } catch (DroppedSnapshotException ex) {
525       // Cache flush can fail in a few places. If it fails in a critical
526       // section, we get a DroppedSnapshotException and a replay of wal
527       // is required. Currently the only way to do this is a restart of
528       // the server. Abort because hdfs is probably bad (HBASE-644 is a case
529       // where hdfs was bad but passed the hdfs check).
530       server.abort("Replay of WAL required. Forcing server shutdown", ex);
531       return false;
532     } catch (IOException ex) {
533       LOG.error("Cache flush failed" + (region != null ? (" for region " +
534           Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
535         RemoteExceptionHandler.checkIOException(ex));
536       if (!server.checkFileSystem()) {
537         return false;
538       }
539     } finally {
540       lock.readLock().unlock();
541       wakeUpIfBlocking();
542     }
543     return true;
544   }
545 
546   private void notifyFlushRequest(Region region, boolean emergencyFlush) {
547     FlushType type = FlushType.NORMAL;
548     if (emergencyFlush) {
549       type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
550     }
551     for (FlushRequestListener listener : flushRequestListeners) {
552       listener.flushRequested(type, region);
553     }
554   }
555 
556   private void wakeUpIfBlocking() {
557     synchronized (blockSignal) {
558       blockSignal.notifyAll();
559     }
560   }
561 
562   private boolean isTooManyStoreFiles(Region region) {
563     for (Store store : region.getStores()) {
564       if (store.hasTooManyStoreFiles()) {
565         return true;
566       }
567     }
568     return false;
569   }
570 
571   /**
572    * Check if the regionserver's memstore memory usage is greater than the
573    * limit. If so, flush regions with the biggest memstores until we're down
574    * to the lower limit. This method blocks callers until we're down to a safe
575    * amount of memstore consumption.
576    */
577   public void reclaimMemStoreMemory() {
578     TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
579     if (isAboveHighWaterMark()) {
580       if (Trace.isTracing()) {
581         scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
582       }
583       long start = EnvironmentEdgeManager.currentTime();
584       synchronized (this.blockSignal) {
585         boolean blocked = false;
586         long startTime = 0;
587         boolean interrupted = false;
588         try {
589           while (isAboveHighWaterMark() && !server.isStopped()) {
590             if (!blocked) {
591               startTime = EnvironmentEdgeManager.currentTime();
592               LOG.info("Blocking updates on "
593                   + server.toString()
594                   + ": the global memstore size "
595                   + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
596                       .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
597                   + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
598             }
599             blocked = true;
600             wakeupFlushThread();
601             try {
602               // we should be able to wait forever, but we've seen a bug where
603               // we miss a notify, so put a 5 second bound on it at least.
604               blockSignal.wait(5 * 1000);
605             } catch (InterruptedException ie) {
606               LOG.warn("Interrupted while waiting");
607               interrupted = true;
608             }
609             long took = EnvironmentEdgeManager.currentTime() - start;
610             LOG.warn("Memstore is above high water mark and block " + took + "ms");
611           }
612         } finally {
613           if (interrupted) {
614             Thread.currentThread().interrupt();
615           }
616         }
617 
618         if(blocked){
619           final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
620           if(totalTime > 0){
621             this.updatesBlockedMsHighWater.add(totalTime);
622           }
623           LOG.info("Unblocking updates for server " + server.toString());
624         }
625       }
626     } else if (isAboveLowWaterMark()) {
627       wakeupFlushThread();
628     }
629     scope.close();
630   }
631   @Override
632   public String toString() {
633     return "flush_queue="
634         + flushQueue.size();
635   }
636 
637   public String dumpQueue() {
638     StringBuilder queueList = new StringBuilder();
639     queueList.append("Flush Queue Queue dump:\n");
640     queueList.append("  Flush Queue:\n");
641     java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
642 
643     while(it.hasNext()){
644       queueList.append("    "+it.next().toString());
645       queueList.append("\n");
646     }
647 
648     return queueList.toString();
649   }
650 
651   /**
652    * Register a MemstoreFlushListener
653    * @param listener
654    */
655   @Override
656   public void registerFlushRequestListener(final FlushRequestListener listener) {
657     this.flushRequestListeners.add(listener);
658   }
659 
660   /**
661    * Unregister the listener from MemstoreFlushListeners
662    * @param listener
663    * @return true when passed listener is unregistered successfully.
664    */
665   @Override
666   public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
667     return this.flushRequestListeners.remove(listener);
668   }
669 
670   /**
671    * Sets the global memstore limit to a new size.
672    * @param globalMemStoreSize
673    */
674   @Override
675   public void setGlobalMemstoreLimit(long globalMemStoreSize) {
676     this.globalMemStoreLimit = globalMemStoreSize;
677     this.globalMemStoreLimitLowMark =
678         (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
679     reclaimMemStoreMemory();
680   }
681 
682   public long getMemoryLimit() {
683     return this.globalMemStoreLimit;
684   }
685 
686   interface FlushQueueEntry extends Delayed {
687   }
688 
689   /**
690    * Token to insert into the flush queue that ensures that the flusher does not sleep
691    */
692   static class WakeupFlushThread implements FlushQueueEntry {
693     @Override
694     public long getDelay(TimeUnit unit) {
695       return 0;
696     }
697 
698     @Override
699     public int compareTo(Delayed o) {
700       return -1;
701     }
702 
703     @Override
704     public boolean equals(Object obj) {
705       return (this == obj);
706     }
707   }
708 
709   /**
710    * Datastructure used in the flush queue.  Holds region and retry count.
711    * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
712    * construction, the delay is zero. When added to a delay queue, we'll come
713    * out near immediately.  Call {@link #requeue(long)} passing delay in
714    * milliseconds before readding to delay queue if you want it to stay there
715    * a while.
716    */
717   static class FlushRegionEntry implements FlushQueueEntry {
718     private final Region region;
719 
720     private final long createTime;
721     private long whenToExpire;
722     private int requeueCount = 0;
723 
724     private boolean forceFlushAllStores;
725 
726     FlushRegionEntry(final Region r, boolean forceFlushAllStores) {
727       this.region = r;
728       this.createTime = EnvironmentEdgeManager.currentTime();
729       this.whenToExpire = this.createTime;
730       this.forceFlushAllStores = forceFlushAllStores;
731     }
732 
733     /**
734      * @param maximumWait
735      * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
736      */
737     public boolean isMaximumWait(final long maximumWait) {
738       return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
739     }
740 
741     /**
742      * @return Count of times {@link #requeue(long)} was called; i.e this is
743      * number of times we've been requeued.
744      */
745     public int getRequeueCount() {
746       return this.requeueCount;
747     }
748 
749     /**
750      * @return whether we need to flush all stores.
751      */
752     public boolean isForceFlushAllStores() {
753       return forceFlushAllStores;
754     }
755 
756     /**
757      * @param when When to expire, when to come up out of the queue.
758      * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
759      * to whatever you pass.
760      * @return This.
761      */
762     public FlushRegionEntry requeue(final long when) {
763       this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
764       this.requeueCount++;
765       return this;
766     }
767 
768     @Override
769     public long getDelay(TimeUnit unit) {
770       return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
771           TimeUnit.MILLISECONDS);
772     }
773 
774     @Override
775     public int compareTo(Delayed other) {
776       // Delay is compared first. If there is a tie, compare region's hash code
777       int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
778         other.getDelay(TimeUnit.MILLISECONDS)).intValue();
779       if (ret != 0) {
780         return ret;
781       }
782       FlushQueueEntry otherEntry = (FlushQueueEntry) other;
783       return hashCode() - otherEntry.hashCode();
784     }
785 
786     @Override
787     public String toString() {
788       return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]";
789     }
790 
791     @Override
792     public int hashCode() {
793       int hash = (int) getDelay(TimeUnit.MILLISECONDS);
794       return hash ^ region.hashCode();
795     }
796 
797    @Override
798     public boolean equals(Object obj) {
799       if (this == obj) {
800         return true;
801       }
802       if (obj == null || getClass() != obj.getClass()) {
803         return false;
804       }
805       Delayed other = (Delayed) obj;
806       return compareTo(other) == 0;
807     }
808   }
809 }
810 
811 enum FlushType {
812   NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
813 }