View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.lang.Thread.UncaughtExceptionHandler;
23  import java.lang.management.ManagementFactory;
24  import java.util.ArrayList;
25  import java.util.ConcurrentModificationException;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.SortedMap;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.concurrent.DelayQueue;
34  import java.util.concurrent.Delayed;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.locks.ReentrantReadWriteLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.DroppedSnapshotException;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.Counter;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  import org.apache.hadoop.hbase.util.HasThread;
51  import org.apache.hadoop.hbase.util.Threads;
52  import org.apache.hadoop.ipc.RemoteException;
53  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
54  import org.htrace.Trace;
55  import org.htrace.TraceScope;
56  
57  import com.google.common.base.Preconditions;
58  
59  /**
60   * Thread that flushes cache on request
61   *
62   * NOTE: This class extends Thread rather than Chore because the sleep time
63   * can be interrupted when there is something to do, rather than the Chore
64   * sleep time which is invariant.
65   *
66   * @see FlushRequester
67   */
68  @InterfaceAudience.Private
69  class MemStoreFlusher implements FlushRequester {
70    static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
71  
72    // These two data members go together.  Any entry in the one must have
73    // a corresponding entry in the other.
74    private final BlockingQueue<FlushQueueEntry> flushQueue =
75      new DelayQueue<FlushQueueEntry>();
76    private final Map<HRegion, FlushRegionEntry> regionsInQueue =
77      new HashMap<HRegion, FlushRegionEntry>();
78    private AtomicBoolean wakeupPending = new AtomicBoolean();
79  
80    private final long threadWakeFrequency;
81    private final HRegionServer server;
82    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
83    private final Object blockSignal = new Object();
84  
85    protected long globalMemStoreLimit;
86    protected float globalMemStoreLimitLowMarkPercent;
87    protected long globalMemStoreLimitLowMark;
88  
89    private long blockingWaitTime;
90    private final Counter updatesBlockedMsHighWater = new Counter();
91  
92    private final FlushHandler[] flushHandlers;
93    private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);
94  
95    /**
96     * @param conf
97     * @param server
98     */
99    public MemStoreFlusher(final Configuration conf,
100       final HRegionServer server) {
101     super();
102     this.server = server;
103     this.threadWakeFrequency =
104       conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
105     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
106     float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
107     this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
108     this.globalMemStoreLimitLowMarkPercent = 
109         HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
110     this.globalMemStoreLimitLowMark = 
111         (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
112 
113     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
114       90000);
115     int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
116     this.flushHandlers = new FlushHandler[handlerCount];
117     LOG.info("globalMemStoreLimit="
118         + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
119         + ", globalMemStoreLimitLowMark="
120         + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
121         + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
122   }
123 
124   public Counter getUpdatesBlockedMsHighWater() {
125     return this.updatesBlockedMsHighWater;
126   }
127 
128   /**
129    * The memstore across all regions has exceeded the low water mark. Pick
130    * one region to flush and flush it synchronously (this is called from the
131    * flush thread)
132    * @return true if successful
133    */
134   private boolean flushOneForGlobalPressure() {
135     SortedMap<Long, HRegion> regionsBySize =
136         server.getCopyOfOnlineRegionsSortedBySize();
137 
138     Set<HRegion> excludedRegions = new HashSet<HRegion>();
139 
140     boolean flushedOne = false;
141     while (!flushedOne) {
142       // Find the biggest region that doesn't have too many storefiles
143       // (might be null!)
144       HRegion bestFlushableRegion = getBiggestMemstoreRegion(
145           regionsBySize, excludedRegions, true);
146       // Find the biggest region, total, even if it might have too many flushes.
147       HRegion bestAnyRegion = getBiggestMemstoreRegion(
148           regionsBySize, excludedRegions, false);
149 
150       if (bestAnyRegion == null) {
151         LOG.error("Above memory mark but there are no flushable regions!");
152         return false;
153       }
154 
155       HRegion regionToFlush;
156       if (bestFlushableRegion != null &&
157           bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
158         // Even if it's not supposed to be flushed, pick a region if it's more than twice
159         // as big as the best flushable one - otherwise when we're under pressure we make
160         // lots of little flushes and cause lots of compactions, etc, which just makes
161         // life worse!
162         if (LOG.isDebugEnabled()) {
163           LOG.debug("Under global heap pressure: " + "Region "
164               + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is "
165               + TraditionalBinaryPrefix.long2String(bestAnyRegion.memstoreSize.get(), "", 1)
166               + " vs best flushable region's "
167               + TraditionalBinaryPrefix.long2String(bestFlushableRegion.memstoreSize.get(), "", 1)
168               + ". Choosing the bigger.");
169         }
170         regionToFlush = bestAnyRegion;
171       } else {
172         if (bestFlushableRegion == null) {
173           regionToFlush = bestAnyRegion;
174         } else {
175           regionToFlush = bestFlushableRegion;
176         }
177       }
178 
179       Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
180 
181       LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
182       flushedOne = flushRegion(regionToFlush, true, true);
183       if (!flushedOne) {
184         LOG.info("Excluding unflushable region " + regionToFlush +
185           " - trying to find a different region to flush.");
186         excludedRegions.add(regionToFlush);
187       }
188     }
189     return true;
190   }
191 
192   private class FlushHandler extends HasThread {
193 
194     private FlushHandler(String name) {
195       super(name);
196     }
197 
198     @Override
199     public void run() {
200       while (!server.isStopped()) {
201         FlushQueueEntry fqe = null;
202         try {
203           wakeupPending.set(false); // allow someone to wake us up again
204           fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
205           if (fqe == null || fqe instanceof WakeupFlushThread) {
206             if (isAboveLowWaterMark()) {
207               LOG.debug("Flush thread woke up because memory above low water="
208                   + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
209               if (!flushOneForGlobalPressure()) {
210                 // Wasn't able to flush any region, but we're above low water mark
211                 // This is unlikely to happen, but might happen when closing the
212                 // entire server - another thread is flushing regions. We'll just
213                 // sleep a little bit to avoid spinning, and then pretend that
214                 // we flushed one, so anyone blocked will check again
215                 Thread.sleep(1000);
216                 wakeUpIfBlocking();
217               }
218               // Enqueue another one of these tokens so we'll wake up again
219               wakeupFlushThread();
220             }
221             continue;
222           }
223           FlushRegionEntry fre = (FlushRegionEntry) fqe;
224           if (!flushRegion(fre)) {
225             break;
226           }
227         } catch (InterruptedException ex) {
228           continue;
229         } catch (ConcurrentModificationException ex) {
230           continue;
231         } catch (Exception ex) {
232           LOG.error("Cache flusher failed for entry " + fqe, ex);
233           if (!server.checkFileSystem()) {
234             break;
235           }
236         }
237       }
238       synchronized (regionsInQueue) {
239         regionsInQueue.clear();
240         flushQueue.clear();
241       }
242 
243       // Signal anyone waiting, so they see the close flag
244       wakeUpIfBlocking();
245       LOG.info(getName() + " exiting");
246     }
247   }
248 
249 
250   private void wakeupFlushThread() {
251     if (wakeupPending.compareAndSet(false, true)) {
252       flushQueue.add(new WakeupFlushThread());
253     }
254   }
255 
256   private HRegion getBiggestMemstoreRegion(
257       SortedMap<Long, HRegion> regionsBySize,
258       Set<HRegion> excludedRegions,
259       boolean checkStoreFileCount) {
260     synchronized (regionsInQueue) {
261       for (HRegion region : regionsBySize.values()) {
262         if (excludedRegions.contains(region)) {
263           continue;
264         }
265 
266         if (region.writestate.flushing || !region.writestate.writesEnabled) {
267           continue;
268         }
269 
270         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
271           continue;
272         }
273         return region;
274       }
275     }
276     return null;
277   }
278 
279   /**
280    * Return true if global memory usage is above the high watermark
281    */
282   private boolean isAboveHighWaterMark() {
283     return server.getRegionServerAccounting().
284       getGlobalMemstoreSize() >= globalMemStoreLimit;
285   }
286 
287   /**
288    * Return true if we're above the high watermark
289    */
290   private boolean isAboveLowWaterMark() {
291     return server.getRegionServerAccounting().
292       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
293   }
294 
295   public void requestFlush(HRegion r, boolean forceFlushAllStores) {
296     synchronized (regionsInQueue) {
297       if (!regionsInQueue.containsKey(r)) {
298         // This entry has no delay so it will be added at the top of the flush
299         // queue.  It'll come out near immediately.
300         FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
301         this.regionsInQueue.put(r, fqe);
302         this.flushQueue.add(fqe);
303       }
304     }
305   }
306 
307   public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
308     synchronized (regionsInQueue) {
309       if (!regionsInQueue.containsKey(r)) {
310         // This entry has some delay
311         FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
312         fqe.requeue(delay);
313         this.regionsInQueue.put(r, fqe);
314         this.flushQueue.add(fqe);
315       }
316     }
317   }
318 
319   public int getFlushQueueSize() {
320     return flushQueue.size();
321   }
322 
323   /**
324    * Only interrupt once it's done with a run through the work loop.
325    */
326   void interruptIfNecessary() {
327     lock.writeLock().lock();
328     try {
329       for (FlushHandler flushHander : flushHandlers) {
330         if (flushHander != null) flushHander.interrupt();
331       }
332     } finally {
333       lock.writeLock().unlock();
334     }
335   }
336 
337   synchronized void start(UncaughtExceptionHandler eh) {
338     ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
339         server.getServerName().toShortString() + "-MemStoreFlusher", eh);
340     for (int i = 0; i < flushHandlers.length; i++) {
341       flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
342       flusherThreadFactory.newThread(flushHandlers[i]);
343       flushHandlers[i].start();
344     }
345   }
346 
347   boolean isAlive() {
348     for (FlushHandler flushHander : flushHandlers) {
349       if (flushHander != null && flushHander.isAlive()) {
350         return true;
351       }
352     }
353     return false;
354   }
355 
356   void join() {
357     for (FlushHandler flushHander : flushHandlers) {
358       if (flushHander != null) {
359         Threads.shutdown(flushHander.getThread());
360       }
361     }
362   }
363 
364   /**
365    * A flushRegion that checks store file count.  If too many, puts the flush
366    * on delay queue to retry later.
367    * @param fqe
368    * @return true if the region was successfully flushed, false otherwise. If
369    * false, there will be accompanying log messages explaining why the log was
370    * not flushed.
371    */
372   private boolean flushRegion(final FlushRegionEntry fqe) {
373     HRegion region = fqe.region;
374     if (!region.getRegionInfo().isMetaRegion() &&
375         isTooManyStoreFiles(region)) {
376       if (fqe.isMaximumWait(this.blockingWaitTime)) {
377         LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
378           "ms on a compaction to clean up 'too many store files'; waited " +
379           "long enough... proceeding with flush of " +
380           region.getRegionNameAsString());
381       } else {
382         // If this is first time we've been put off, then emit a log message.
383         if (fqe.getRequeueCount() <= 0) {
384           // Note: We don't impose blockingStoreFiles constraint on meta regions
385           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
386             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
387           if (!this.server.compactSplitThread.requestSplit(region)) {
388             try {
389               this.server.compactSplitThread.requestSystemCompaction(
390                   region, Thread.currentThread().getName());
391             } catch (IOException e) {
392               e = e instanceof RemoteException ?
393                       ((RemoteException)e).unwrapRemoteException() : e;
394               LOG.error(
395                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
396                 e);
397             }
398           }
399         }
400 
401         // Put back on the queue.  Have it come back out of the queue
402         // after a delay of this.blockingWaitTime / 100 ms.
403         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
404         // Tell a lie, it's not flushed but it's ok
405         return true;
406       }
407     }
408     return flushRegion(region, false, fqe.isForceFlushAllStores());
409   }
410 
411   /**
412    * Flush a region.
413    * @param region Region to flush.
414    * @param emergencyFlush Set if we are being force flushed. If true the region
415    * needs to be removed from the flush queue. If false, when we were called
416    * from the main flusher run loop and we got the entry to flush by calling
417    * poll on the flush queue (which removed it).
418    * @param forceFlushAllStores whether we want to flush all store.
419    * @return true if the region was successfully flushed, false otherwise. If
420    * false, there will be accompanying log messages explaining why the log was
421    * not flushed.
422    */
423   private boolean flushRegion(final HRegion region, final boolean emergencyFlush,
424       boolean forceFlushAllStores) {
425     long startTime = 0;
426     synchronized (this.regionsInQueue) {
427       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
428       // Use the start time of the FlushRegionEntry if available
429       if (fqe != null) {
430         startTime = fqe.createTime;
431       }
432       if (fqe != null && emergencyFlush) {
433         // Need to remove from region from delay queue.  When NOT an
434         // emergencyFlush, then item was removed via a flushQueue.poll.
435         flushQueue.remove(fqe);
436      }
437     }
438     if (startTime == 0) {
439       // Avoid getting the system time unless we don't have a FlushRegionEntry;
440       // shame we can't capture the time also spent in the above synchronized
441       // block
442       startTime = EnvironmentEdgeManager.currentTime();
443     }
444     lock.readLock().lock();
445     try {
446       notifyFlushRequest(region, emergencyFlush);
447       HRegion.FlushResult flushResult = region.flushcache(forceFlushAllStores);
448       boolean shouldCompact = flushResult.isCompactionNeeded();
449       // We just want to check the size
450       boolean shouldSplit = region.checkSplit() != null;
451       if (shouldSplit) {
452         this.server.compactSplitThread.requestSplit(region);
453       } else if (shouldCompact) {
454         server.compactSplitThread.requestSystemCompaction(
455             region, Thread.currentThread().getName());
456       }
457       if (flushResult.isFlushSucceeded()) {
458         long endTime = EnvironmentEdgeManager.currentTime();
459         server.metricsRegionServer.updateFlushTime(endTime - startTime);
460       }
461     } catch (DroppedSnapshotException ex) {
462       // Cache flush can fail in a few places. If it fails in a critical
463       // section, we get a DroppedSnapshotException and a replay of wal
464       // is required. Currently the only way to do this is a restart of
465       // the server. Abort because hdfs is probably bad (HBASE-644 is a case
466       // where hdfs was bad but passed the hdfs check).
467       server.abort("Replay of WAL required. Forcing server shutdown", ex);
468       return false;
469     } catch (IOException ex) {
470       ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
471       LOG.error(
472         "Cache flush failed"
473             + (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName()))
474                 : ""), ex);
475       if (!server.checkFileSystem()) {
476         return false;
477       }
478     } finally {
479       lock.readLock().unlock();
480       wakeUpIfBlocking();
481     }
482     return true;
483   }
484 
485   private void notifyFlushRequest(HRegion region, boolean emergencyFlush) {
486     FlushType type = FlushType.NORMAL;
487     if (emergencyFlush) {
488       type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
489     }
490     for (FlushRequestListener listener : flushRequestListeners) {
491       listener.flushRequested(type, region);
492     }
493   }
494 
495   private void wakeUpIfBlocking() {
496     synchronized (blockSignal) {
497       blockSignal.notifyAll();
498     }
499   }
500 
501   private boolean isTooManyStoreFiles(HRegion region) {
502     for (Store store : region.stores.values()) {
503       if (store.hasTooManyStoreFiles()) {
504         return true;
505       }
506     }
507     return false;
508   }
509 
510   /**
511    * Check if the regionserver's memstore memory usage is greater than the
512    * limit. If so, flush regions with the biggest memstores until we're down
513    * to the lower limit. This method blocks callers until we're down to a safe
514    * amount of memstore consumption.
515    */
516   public void reclaimMemStoreMemory() {
517     TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
518     if (isAboveHighWaterMark()) {
519       if (Trace.isTracing()) {
520         scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
521       }
522       long start = EnvironmentEdgeManager.currentTime();
523       synchronized (this.blockSignal) {
524         boolean blocked = false;
525         long startTime = 0;
526         boolean interrupted = false;
527         try {
528           while (isAboveHighWaterMark() && !server.isStopped()) {
529             if (!blocked) {
530               startTime = EnvironmentEdgeManager.currentTime();
531               LOG.info("Blocking updates on "
532                   + server.toString()
533                   + ": the global memstore size "
534                   + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
535                       .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
536                   + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
537             }
538             blocked = true;
539             wakeupFlushThread();
540             try {
541               // we should be able to wait forever, but we've seen a bug where
542               // we miss a notify, so put a 5 second bound on it at least.
543               blockSignal.wait(5 * 1000);
544             } catch (InterruptedException ie) {
545               LOG.warn("Interrupted while waiting");
546               interrupted = true;
547             }
548             long took = EnvironmentEdgeManager.currentTime() - start;
549             LOG.warn("Memstore is above high water mark and block " + took + "ms");
550           }
551         } finally {
552           if (interrupted) {
553             Thread.currentThread().interrupt();
554           }
555         }
556 
557         if(blocked){
558           final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
559           if(totalTime > 0){
560             this.updatesBlockedMsHighWater.add(totalTime);
561           }
562           LOG.info("Unblocking updates for server " + server.toString());
563         }
564       }
565     } else if (isAboveLowWaterMark()) {
566       wakeupFlushThread();
567     }
568     scope.close();
569   }
570   @Override
571   public String toString() {
572     return "flush_queue="
573         + flushQueue.size();
574   }
575 
576   public String dumpQueue() {
577     StringBuilder queueList = new StringBuilder();
578     queueList.append("Flush Queue Queue dump:\n");
579     queueList.append("  Flush Queue:\n");
580     java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
581 
582     while(it.hasNext()){
583       queueList.append("    "+it.next().toString());
584       queueList.append("\n");
585     }
586 
587     return queueList.toString();
588   }
589 
590   /**
591    * Register a MemstoreFlushListener
592    * @param listener
593    */
594   public void registerFlushRequestListener(final FlushRequestListener listener) {
595     this.flushRequestListeners.add(listener);
596   }
597 
598   /**
599    * Unregister the listener from MemstoreFlushListeners
600    * @param listener
601    * @return true when passed listener is unregistered successfully.
602    */
603   public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
604     return this.flushRequestListeners.remove(listener);
605   }
606 
607   /**
608    * Sets the global memstore limit to a new size.
609    * @param globalMemStoreSize
610    */
611   public void setGlobalMemstoreLimit(long globalMemStoreSize) {
612     this.globalMemStoreLimit = globalMemStoreSize;
613     this.globalMemStoreLimitLowMark = 
614         (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
615     reclaimMemStoreMemory();
616   }
617 
618   public long getMemoryLimit() {
619     return this.globalMemStoreLimit;
620   }
621 
622   interface FlushQueueEntry extends Delayed {}
623 
624   /**
625    * Token to insert into the flush queue that ensures that the flusher does not sleep
626    */
627   static class WakeupFlushThread implements FlushQueueEntry {
628     @Override
629     public long getDelay(TimeUnit unit) {
630       return 0;
631     }
632 
633     @Override
634     public int compareTo(Delayed o) {
635       return -1;
636     }
637 
638     @Override
639     public boolean equals(Object obj) {
640       return (this == obj);
641     }
642 
643   }
644 
645   /**
646    * Datastructure used in the flush queue.  Holds region and retry count.
647    * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
648    * construction, the delay is zero. When added to a delay queue, we'll come
649    * out near immediately.  Call {@link #requeue(long)} passing delay in
650    * milliseconds before readding to delay queue if you want it to stay there
651    * a while.
652    */
653   static class FlushRegionEntry implements FlushQueueEntry {
654     private final HRegion region;
655 
656     private final long createTime;
657     private long whenToExpire;
658     private int requeueCount = 0;
659 
660     private boolean forceFlushAllStores;
661 
662     FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) {
663       this.region = r;
664       this.createTime = EnvironmentEdgeManager.currentTime();
665       this.whenToExpire = this.createTime;
666       this.forceFlushAllStores = forceFlushAllStores;
667     }
668 
669     /**
670      * @param maximumWait
671      * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
672      */
673     public boolean isMaximumWait(final long maximumWait) {
674       return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
675     }
676 
677     /**
678      * @return Count of times {@link #requeue(long)} was called; i.e this is
679      * number of times we've been requeued.
680      */
681     public int getRequeueCount() {
682       return this.requeueCount;
683     }
684 
685     /**
686      * @return whether we need to flush all stores.
687      */
688     public boolean isForceFlushAllStores() {
689       return forceFlushAllStores;
690     }
691 
692     /**
693      * @param when When to expire, when to come up out of the queue.
694      * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
695      * to whatever you pass.
696      * @return This.
697      */
698     public FlushRegionEntry requeue(final long when) {
699       this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
700       this.requeueCount++;
701       return this;
702     }
703 
704     @Override
705     public long getDelay(TimeUnit unit) {
706       return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
707           TimeUnit.MILLISECONDS);
708     }
709 
710     @Override
711     public int compareTo(Delayed other) {
712       return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
713         other.getDelay(TimeUnit.MILLISECONDS)).intValue();
714     }
715 
716     @Override
717     public String toString() {
718       return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
719     }
720 
721     @Override
722     public int hashCode() {
723       return (int) getDelay(TimeUnit.MILLISECONDS);
724     }
725 
726    @Override
727     public boolean equals(Object obj) {
728       if (this == obj) {
729         return true;
730       }
731       if (obj == null || getClass() != obj.getClass()) {
732         return false;
733       }
734       Delayed other = (Delayed) obj;
735       return compareTo(other) == 0;
736     }
737   }
738 }
739 
740 enum FlushType {
741   NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
742 }