View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.lang.Thread.UncaughtExceptionHandler;
23  import java.lang.management.ManagementFactory;
24  import java.util.ArrayList;
25  import java.util.ConcurrentModificationException;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.SortedMap;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.concurrent.DelayQueue;
34  import java.util.concurrent.Delayed;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.locks.ReentrantReadWriteLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.hbase.DroppedSnapshotException;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.RemoteExceptionHandler;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  import org.apache.hadoop.hbase.util.HasThread;
50  import org.apache.hadoop.hbase.util.Threads;
51  import org.apache.hadoop.util.StringUtils;
52  import org.cloudera.htrace.Trace;
53  import org.cloudera.htrace.TraceScope;
54  import org.apache.hadoop.hbase.util.Counter;
55  
56  import com.google.common.base.Preconditions;
57  
58  /**
59   * Thread that flushes cache on request
60   *
61   * NOTE: This class extends Thread rather than Chore because the sleep time
62   * can be interrupted when there is something to do, rather than the Chore
63   * sleep time which is invariant.
64   *
65   * @see FlushRequester
66   */
67  @InterfaceAudience.Private
68  class MemStoreFlusher implements FlushRequester {
69    static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
70    static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size";
71    private static final String MEMSTORE_SIZE_OLD_KEY = 
72        "hbase.regionserver.global.memstore.upperLimit";
73    private static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY = 
74        "hbase.regionserver.global.memstore.size.lower.limit";
75    private static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY = 
76        "hbase.regionserver.global.memstore.lowerLimit";
77  
78    private static final float DEFAULT_MEMSTORE_SIZE = 0.4f;
79    // Default lower water mark limit is 95% size of memstore size.
80    private static final float DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT = 0.95f;
81  
82    // These two data members go together.  Any entry in the one must have
83    // a corresponding entry in the other.
84    private final BlockingQueue<FlushQueueEntry> flushQueue =
85      new DelayQueue<FlushQueueEntry>();
86    private final Map<HRegion, FlushRegionEntry> regionsInQueue =
87      new HashMap<HRegion, FlushRegionEntry>();
88    private AtomicBoolean wakeupPending = new AtomicBoolean();
89  
90    private final long threadWakeFrequency;
91    private final HRegionServer server;
92    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
93    private final Object blockSignal = new Object();
94  
95    protected long globalMemStoreLimit;
96    protected float globalMemStoreLimitLowMarkPercent;
97    protected long globalMemStoreLimitLowMark;
98  
99    private long blockingWaitTime;
100   private final Counter updatesBlockedMsHighWater = new Counter();
101 
102   private final FlushHandler[] flushHandlers;
103   private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);
104 
105   /**
106    * @param conf
107    * @param server
108    */
109   public MemStoreFlusher(final Configuration conf,
110       final HRegionServer server) {
111     super();
112     this.server = server;
113     this.threadWakeFrequency =
114       conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
115     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
116     float globalMemStorePercent = getGlobalMemStorePercent(conf);
117     this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
118     this.globalMemStoreLimitLowMarkPercent = 
119         getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
120     this.globalMemStoreLimitLowMark = 
121         (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
122 
123     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
124       90000);
125     int handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
126     this.flushHandlers = new FlushHandler[handlerCount];
127     LOG.info("globalMemStoreLimit=" +
128       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
129       ", globalMemStoreLimitLowMark=" +
130       StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
131       ", maxHeap=" + StringUtils.humanReadableInt(max));
132   }
133 
134   /**
135    * Retrieve global memstore configured size as percentage of total heap.
136    */
137   static float getGlobalMemStorePercent(final Configuration c) {
138     float limit = c.getFloat(MEMSTORE_SIZE_KEY,
139         c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE));
140     if (limit > 0.8f || limit < 0.05f) {
141       LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE
142           + " because supplied value outside allowed range of 0.05 -> 0.8");
143       limit = DEFAULT_MEMSTORE_SIZE;
144     }
145     return limit;
146   }
147 
148   private static float getGlobalMemStoreLowerMark(final Configuration c, float globalMemStorePercent) {
149     String lowMarkPercentStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_KEY);
150     if (lowMarkPercentStr != null) {
151       return Float.parseFloat(lowMarkPercentStr);
152     }
153     String lowerWaterMarkOldValStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY);
154     if (lowerWaterMarkOldValStr != null) {
155       LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use "
156           + MEMSTORE_SIZE_LOWER_LIMIT_KEY);
157       float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr);
158       if (lowerWaterMarkOldVal > globalMemStorePercent) {
159         lowerWaterMarkOldVal = globalMemStorePercent;
160         LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " + "because supplied "
161             + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " was > " + MEMSTORE_SIZE_OLD_KEY);
162       }
163       return lowerWaterMarkOldVal / globalMemStorePercent;
164     }
165     return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT;
166   }
167 
168   public Counter getUpdatesBlockedMsHighWater() {
169     return this.updatesBlockedMsHighWater;
170   }
171 
172   /**
173    * The memstore across all regions has exceeded the low water mark. Pick
174    * one region to flush and flush it synchronously (this is called from the
175    * flush thread)
176    * @return true if successful
177    */
178   private boolean flushOneForGlobalPressure() {
179     SortedMap<Long, HRegion> regionsBySize =
180         server.getCopyOfOnlineRegionsSortedBySize();
181 
182     Set<HRegion> excludedRegions = new HashSet<HRegion>();
183 
184     boolean flushedOne = false;
185     while (!flushedOne) {
186       // Find the biggest region that doesn't have too many storefiles
187       // (might be null!)
188       HRegion bestFlushableRegion = getBiggestMemstoreRegion(
189           regionsBySize, excludedRegions, true);
190       // Find the biggest region, total, even if it might have too many flushes.
191       HRegion bestAnyRegion = getBiggestMemstoreRegion(
192           regionsBySize, excludedRegions, false);
193 
194       if (bestAnyRegion == null) {
195         LOG.error("Above memory mark but there are no flushable regions!");
196         return false;
197       }
198 
199       HRegion regionToFlush;
200       if (bestFlushableRegion != null &&
201           bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
202         // Even if it's not supposed to be flushed, pick a region if it's more than twice
203         // as big as the best flushable one - otherwise when we're under pressure we make
204         // lots of little flushes and cause lots of compactions, etc, which just makes
205         // life worse!
206         if (LOG.isDebugEnabled()) {
207           LOG.debug("Under global heap pressure: " +
208             "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
209             "store files, but is " +
210             StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
211             " vs best flushable region's " +
212             StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
213             ". Choosing the bigger.");
214         }
215         regionToFlush = bestAnyRegion;
216       } else {
217         if (bestFlushableRegion == null) {
218           regionToFlush = bestAnyRegion;
219         } else {
220           regionToFlush = bestFlushableRegion;
221         }
222       }
223 
224       Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
225 
226       LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
227       flushedOne = flushRegion(regionToFlush, true);
228       if (!flushedOne) {
229         LOG.info("Excluding unflushable region " + regionToFlush +
230           " - trying to find a different region to flush.");
231         excludedRegions.add(regionToFlush);
232       }
233     }
234     return true;
235   }
236 
237   private class FlushHandler extends HasThread {
238 
239     private FlushHandler(String name) {
240       super(name);
241     }
242 
243     @Override
244     public void run() {
245       while (!server.isStopped()) {
246         FlushQueueEntry fqe = null;
247         try {
248           wakeupPending.set(false); // allow someone to wake us up again
249           fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
250           if (fqe == null || fqe instanceof WakeupFlushThread) {
251             if (isAboveLowWaterMark()) {
252               LOG.debug("Flush thread woke up because memory above low water="
253                   + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
254               if (!flushOneForGlobalPressure()) {
255                 // Wasn't able to flush any region, but we're above low water mark
256                 // This is unlikely to happen, but might happen when closing the
257                 // entire server - another thread is flushing regions. We'll just
258                 // sleep a little bit to avoid spinning, and then pretend that
259                 // we flushed one, so anyone blocked will check again
260                 Thread.sleep(1000);
261                 wakeUpIfBlocking();
262               }
263               // Enqueue another one of these tokens so we'll wake up again
264               wakeupFlushThread();
265             }
266             continue;
267           }
268           FlushRegionEntry fre = (FlushRegionEntry) fqe;
269           if (!flushRegion(fre)) {
270             break;
271           }
272         } catch (InterruptedException ex) {
273           continue;
274         } catch (ConcurrentModificationException ex) {
275           continue;
276         } catch (Exception ex) {
277           LOG.error("Cache flusher failed for entry " + fqe, ex);
278           if (!server.checkFileSystem()) {
279             break;
280           }
281         }
282       }
283       synchronized (regionsInQueue) {
284         regionsInQueue.clear();
285         flushQueue.clear();
286       }
287 
288       // Signal anyone waiting, so they see the close flag
289       wakeUpIfBlocking();
290       LOG.info(getName() + " exiting");
291     }
292   }
293 
294 
295   private void wakeupFlushThread() {
296     if (wakeupPending.compareAndSet(false, true)) {
297       flushQueue.add(new WakeupFlushThread());
298     }
299   }
300 
301   private HRegion getBiggestMemstoreRegion(
302       SortedMap<Long, HRegion> regionsBySize,
303       Set<HRegion> excludedRegions,
304       boolean checkStoreFileCount) {
305     synchronized (regionsInQueue) {
306       for (HRegion region : regionsBySize.values()) {
307         if (excludedRegions.contains(region)) {
308           continue;
309         }
310 
311         if (region.writestate.flushing || !region.writestate.writesEnabled) {
312           continue;
313         }
314 
315         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
316           continue;
317         }
318         return region;
319       }
320     }
321     return null;
322   }
323 
324   /**
325    * Return true if global memory usage is above the high watermark
326    */
327   private boolean isAboveHighWaterMark() {
328     return server.getRegionServerAccounting().
329       getGlobalMemstoreSize() >= globalMemStoreLimit;
330   }
331 
332   /**
333    * Return true if we're above the high watermark
334    */
335   private boolean isAboveLowWaterMark() {
336     return server.getRegionServerAccounting().
337       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
338   }
339 
340   public void requestFlush(HRegion r) {
341     synchronized (regionsInQueue) {
342       if (!regionsInQueue.containsKey(r)) {
343         // This entry has no delay so it will be added at the top of the flush
344         // queue.  It'll come out near immediately.
345         FlushRegionEntry fqe = new FlushRegionEntry(r);
346         this.regionsInQueue.put(r, fqe);
347         this.flushQueue.add(fqe);
348       }
349     }
350   }
351 
352   public void requestDelayedFlush(HRegion r, long delay) {
353     synchronized (regionsInQueue) {
354       if (!regionsInQueue.containsKey(r)) {
355         // This entry has some delay
356         FlushRegionEntry fqe = new FlushRegionEntry(r);
357         fqe.requeue(delay);
358         this.regionsInQueue.put(r, fqe);
359         this.flushQueue.add(fqe);
360       }
361     }
362   }
363 
364   public int getFlushQueueSize() {
365     return flushQueue.size();
366   }
367 
368   /**
369    * Only interrupt once it's done with a run through the work loop.
370    */
371   void interruptIfNecessary() {
372     lock.writeLock().lock();
373     try {
374       for (FlushHandler flushHander : flushHandlers) {
375         if (flushHander != null) flushHander.interrupt();
376       }
377     } finally {
378       lock.writeLock().unlock();
379     }
380   }
381 
382   synchronized void start(UncaughtExceptionHandler eh) {
383     ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
384         server.getServerName().toShortString() + "-MemStoreFlusher", eh);
385     for (int i = 0; i < flushHandlers.length; i++) {
386       flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
387       flusherThreadFactory.newThread(flushHandlers[i]);
388       flushHandlers[i].start();
389     }
390   }
391 
392   boolean isAlive() {
393     for (FlushHandler flushHander : flushHandlers) {
394       if (flushHander != null && flushHander.isAlive()) {
395         return true;
396       }
397     }
398     return false;
399   }
400 
401   void join() {
402     for (FlushHandler flushHander : flushHandlers) {
403       if (flushHander != null) {
404         Threads.shutdown(flushHander.getThread());
405       }
406     }
407   }
408 
409   /*
410    * A flushRegion that checks store file count.  If too many, puts the flush
411    * on delay queue to retry later.
412    * @param fqe
413    * @return true if the region was successfully flushed, false otherwise. If
414    * false, there will be accompanying log messages explaining why the log was
415    * not flushed.
416    */
417   private boolean flushRegion(final FlushRegionEntry fqe) {
418     HRegion region = fqe.region;
419     if (!region.getRegionInfo().isMetaRegion() &&
420         isTooManyStoreFiles(region)) {
421       if (fqe.isMaximumWait(this.blockingWaitTime)) {
422         LOG.info("Waited " + (EnvironmentEdgeManager.currentTimeMillis() - fqe.createTime) +
423           "ms on a compaction to clean up 'too many store files'; waited " +
424           "long enough... proceeding with flush of " +
425           region.getRegionNameAsString());
426       } else {
427         // If this is first time we've been put off, then emit a log message.
428         if (fqe.getRequeueCount() <= 0) {
429           // Note: We don't impose blockingStoreFiles constraint on meta regions
430           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
431             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
432           if (!this.server.compactSplitThread.requestSplit(region)) {
433             try {
434               this.server.compactSplitThread.requestSystemCompaction(
435                   region, Thread.currentThread().getName());
436             } catch (IOException e) {
437               LOG.error(
438                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
439                 RemoteExceptionHandler.checkIOException(e));
440             }
441           }
442         }
443 
444         // Put back on the queue.  Have it come back out of the queue
445         // after a delay of this.blockingWaitTime / 100 ms.
446         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
447         // Tell a lie, it's not flushed but it's ok
448         return true;
449       }
450     }
451     return flushRegion(region, false);
452   }
453 
454   /*
455    * Flush a region.
456    * @param region Region to flush.
457    * @param emergencyFlush Set if we are being force flushed. If true the region
458    * needs to be removed from the flush queue. If false, when we were called
459    * from the main flusher run loop and we got the entry to flush by calling
460    * poll on the flush queue (which removed it).
461    *
462    * @return true if the region was successfully flushed, false otherwise. If
463    * false, there will be accompanying log messages explaining why the log was
464    * not flushed.
465    */
466   private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
467     synchronized (this.regionsInQueue) {
468       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
469       if (fqe != null && emergencyFlush) {
470         // Need to remove from region from delay queue.  When NOT an
471         // emergencyFlush, then item was removed via a flushQueue.poll.
472         flushQueue.remove(fqe);
473      }
474     }
475     lock.readLock().lock();
476     try {
477       notifyFlushRequest(region, emergencyFlush);
478       boolean shouldCompact = region.flushcache();
479       // We just want to check the size
480       boolean shouldSplit = region.checkSplit() != null;
481       if (shouldSplit) {
482         this.server.compactSplitThread.requestSplit(region);
483       } else if (shouldCompact) {
484         server.compactSplitThread.requestSystemCompaction(
485             region, Thread.currentThread().getName());
486       }
487 
488     } catch (DroppedSnapshotException ex) {
489       // Cache flush can fail in a few places. If it fails in a critical
490       // section, we get a DroppedSnapshotException and a replay of hlog
491       // is required. Currently the only way to do this is a restart of
492       // the server. Abort because hdfs is probably bad (HBASE-644 is a case
493       // where hdfs was bad but passed the hdfs check).
494       server.abort("Replay of HLog required. Forcing server shutdown", ex);
495       return false;
496     } catch (IOException ex) {
497       LOG.error("Cache flush failed" +
498         (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
499         RemoteExceptionHandler.checkIOException(ex));
500       if (!server.checkFileSystem()) {
501         return false;
502       }
503     } finally {
504       lock.readLock().unlock();
505       wakeUpIfBlocking();
506     }
507     return true;
508   }
509 
510   private void notifyFlushRequest(HRegion region, boolean emergencyFlush) {
511     FlushType type = FlushType.NORMAL;
512     if (emergencyFlush) {
513       type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
514     }
515     for (FlushRequestListener listener : flushRequestListeners) {
516       listener.flushRequested(type, region);
517     }
518   }
519 
520   private void wakeUpIfBlocking() {
521     synchronized (blockSignal) {
522       blockSignal.notifyAll();
523     }
524   }
525 
526   private boolean isTooManyStoreFiles(HRegion region) {
527     for (Store store : region.stores.values()) {
528       if (store.hasTooManyStoreFiles()) {
529         return true;
530       }
531     }
532     return false;
533   }
534 
535   /**
536    * Check if the regionserver's memstore memory usage is greater than the
537    * limit. If so, flush regions with the biggest memstores until we're down
538    * to the lower limit. This method blocks callers until we're down to a safe
539    * amount of memstore consumption.
540    */
541   public void reclaimMemStoreMemory() {
542     TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
543     if (isAboveHighWaterMark()) {
544       if (Trace.isTracing()) {
545         scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
546       }
547       long start = EnvironmentEdgeManager.currentTimeMillis();
548       synchronized (this.blockSignal) {
549         boolean blocked = false;
550         long startTime = 0;
551         boolean interrupted = false;
552         try {
553           while (isAboveHighWaterMark() && !server.isStopped()) {
554             if (!blocked) {
555               startTime = EnvironmentEdgeManager.currentTimeMillis();
556               LOG.info("Blocking updates on " + server.toString() +
557                 ": the global memstore size " +
558                 StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
559                 " is >= than blocking " +
560                 StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
561             }
562             blocked = true;
563             wakeupFlushThread();
564             try {
565               // we should be able to wait forever, but we've seen a bug where
566               // we miss a notify, so put a 5 second bound on it at least.
567               blockSignal.wait(5 * 1000);
568             } catch (InterruptedException ie) {
569               LOG.warn("Interrupted while waiting");
570               interrupted = true;
571             }
572             long took = EnvironmentEdgeManager.currentTimeMillis() - start;
573             LOG.warn("Memstore is above high water mark and block " + took + "ms");
574           }
575         } finally {
576           if (interrupted) {
577             Thread.currentThread().interrupt();
578           }
579         }
580 
581         if(blocked){
582           final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
583           if(totalTime > 0){
584             this.updatesBlockedMsHighWater.add(totalTime);
585           }
586           LOG.info("Unblocking updates for server " + server.toString());
587         }
588       }
589     } else if (isAboveLowWaterMark()) {
590       wakeupFlushThread();
591     }
592     scope.close();
593   }
594   @Override
595   public String toString() {
596     return "flush_queue="
597         + flushQueue.size();
598   }
599 
600   public String dumpQueue() {
601     StringBuilder queueList = new StringBuilder();
602     queueList.append("Flush Queue Queue dump:\n");
603     queueList.append("  Flush Queue:\n");
604     java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
605 
606     while(it.hasNext()){
607       queueList.append("    "+it.next().toString());
608       queueList.append("\n");
609     }
610 
611     return queueList.toString();
612   }
613 
614   /**
615    * Register a MemstoreFlushListener
616    * @param listener
617    */
618   public void registerFlushRequestListener(final FlushRequestListener listener) {
619     this.flushRequestListeners.add(listener);
620   }
621 
622   /**
623    * Unregister the listener from MemstoreFlushListeners
624    * @param listener
625    * @return true when passed listener is unregistered successfully.
626    */
627   public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
628     return this.flushRequestListeners.remove(listener);
629   }
630 
631   /**
632    * Sets the global memstore limit to a new size.
633    * @param globalMemStoreSize
634    */
635   public void setGlobalMemstoreLimit(long globalMemStoreSize) {
636     this.globalMemStoreLimit = globalMemStoreSize;
637     this.globalMemStoreLimitLowMark = 
638         (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
639     reclaimMemStoreMemory();
640   }
641 
642   public long getMemoryLimit() {
643     return this.globalMemStoreLimit;
644   }
645 
646   interface FlushQueueEntry extends Delayed {}
647 
648   /**
649    * Token to insert into the flush queue that ensures that the flusher does not sleep
650    */
651   static class WakeupFlushThread implements FlushQueueEntry {
652     @Override
653     public long getDelay(TimeUnit unit) {
654       return 0;
655     }
656 
657     @Override
658     public int compareTo(Delayed o) {
659       return -1;
660     }
661 
662     @Override
663     public boolean equals(Object obj) {
664       return (this == obj);
665     }
666 
667   }
668 
669   /**
670    * Datastructure used in the flush queue.  Holds region and retry count.
671    * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
672    * construction, the delay is zero. When added to a delay queue, we'll come
673    * out near immediately.  Call {@link #requeue(long)} passing delay in
674    * milliseconds before readding to delay queue if you want it to stay there
675    * a while.
676    */
677   static class FlushRegionEntry implements FlushQueueEntry {
678     private final HRegion region;
679 
680     private final long createTime;
681     private long whenToExpire;
682     private int requeueCount = 0;
683 
684     FlushRegionEntry(final HRegion r) {
685       this.region = r;
686       this.createTime = EnvironmentEdgeManager.currentTimeMillis();
687       this.whenToExpire = this.createTime;
688     }
689 
690     /**
691      * @param maximumWait
692      * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
693      */
694     public boolean isMaximumWait(final long maximumWait) {
695       return (EnvironmentEdgeManager.currentTimeMillis() - this.createTime) > maximumWait;
696     }
697 
698     /**
699      * @return Count of times {@link #requeue(long)} was called; i.e this is
700      * number of times we've been requeued.
701      */
702     public int getRequeueCount() {
703       return this.requeueCount;
704     }
705 
706     /**
707      * @param when When to expire, when to come up out of the queue.
708      * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTimeMillis()
709      * to whatever you pass.
710      * @return This.
711      */
712     public FlushRegionEntry requeue(final long when) {
713       this.whenToExpire = EnvironmentEdgeManager.currentTimeMillis() + when;
714       this.requeueCount++;
715       return this;
716     }
717 
718     @Override
719     public long getDelay(TimeUnit unit) {
720       return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTimeMillis(),
721           TimeUnit.MILLISECONDS);
722     }
723 
724     @Override
725     public int compareTo(Delayed other) {
726       return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
727         other.getDelay(TimeUnit.MILLISECONDS)).intValue();
728     }
729 
730     @Override
731     public String toString() {
732       return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
733     }
734 
735     @Override
736     public boolean equals(Object obj) {
737       if (this == obj) {
738         return true;
739       }
740       if (obj == null || getClass() != obj.getClass()) {
741         return false;
742       }
743       Delayed other = (Delayed) obj;
744       return compareTo(other) == 0;
745     }
746   }
747 }
748 
749 enum FlushType {
750   NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
751 }