View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.lang.Thread.UncaughtExceptionHandler;
23  import java.lang.management.ManagementFactory;
24  import java.util.ArrayList;
25  import java.util.ConcurrentModificationException;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.SortedMap;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.concurrent.DelayQueue;
34  import java.util.concurrent.Delayed;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.locks.ReentrantReadWriteLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.hbase.DroppedSnapshotException;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.Counter;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  import org.apache.hadoop.hbase.util.HasThread;
51  import org.apache.hadoop.hbase.util.Threads;
52  import org.apache.hadoop.ipc.RemoteException;
53  import org.apache.hadoop.util.StringUtils;
54  import org.htrace.Trace;
55  import org.htrace.TraceScope;
56  
57  import com.google.common.base.Preconditions;
58  
59  /**
60   * Thread that flushes cache on request
61   *
62   * NOTE: This class extends Thread rather than Chore because the sleep time
63   * can be interrupted when there is something to do, rather than the Chore
64   * sleep time which is invariant.
65   *
66   * @see FlushRequester
67   */
68  @InterfaceAudience.Private
69  class MemStoreFlusher implements FlushRequester {
70    static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
71  
72    // These two data members go together.  Any entry in the one must have
73    // a corresponding entry in the other.
74    private final BlockingQueue<FlushQueueEntry> flushQueue =
75      new DelayQueue<FlushQueueEntry>();
76    private final Map<HRegion, FlushRegionEntry> regionsInQueue =
77      new HashMap<HRegion, FlushRegionEntry>();
78    private AtomicBoolean wakeupPending = new AtomicBoolean();
79  
80    private final long threadWakeFrequency;
81    private final HRegionServer server;
82    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
83    private final Object blockSignal = new Object();
84  
85    protected long globalMemStoreLimit;
86    protected float globalMemStoreLimitLowMarkPercent;
87    protected long globalMemStoreLimitLowMark;
88  
89    private long blockingWaitTime;
90    private final Counter updatesBlockedMsHighWater = new Counter();
91  
92    private final FlushHandler[] flushHandlers;
93    private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);
94  
95    /**
96     * @param conf
97     * @param server
98     */
99    public MemStoreFlusher(final Configuration conf,
100       final HRegionServer server) {
101     super();
102     this.server = server;
103     this.threadWakeFrequency =
104       conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
105     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
106     float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
107     this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
108     this.globalMemStoreLimitLowMarkPercent = 
109         HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
110     this.globalMemStoreLimitLowMark = 
111         (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
112 
113     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
114       90000);
115     int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
116     this.flushHandlers = new FlushHandler[handlerCount];
117     LOG.info("globalMemStoreLimit=" +
118       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
119       ", globalMemStoreLimitLowMark=" +
120       StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
121       ", maxHeap=" + StringUtils.humanReadableInt(max));
122   }
123 
124   public Counter getUpdatesBlockedMsHighWater() {
125     return this.updatesBlockedMsHighWater;
126   }
127 
128   /**
129    * The memstore across all regions has exceeded the low water mark. Pick
130    * one region to flush and flush it synchronously (this is called from the
131    * flush thread)
132    * @return true if successful
133    */
134   private boolean flushOneForGlobalPressure() {
135     SortedMap<Long, HRegion> regionsBySize =
136         server.getCopyOfOnlineRegionsSortedBySize();
137 
138     Set<HRegion> excludedRegions = new HashSet<HRegion>();
139 
140     boolean flushedOne = false;
141     while (!flushedOne) {
142       // Find the biggest region that doesn't have too many storefiles
143       // (might be null!)
144       HRegion bestFlushableRegion = getBiggestMemstoreRegion(
145           regionsBySize, excludedRegions, true);
146       // Find the biggest region, total, even if it might have too many flushes.
147       HRegion bestAnyRegion = getBiggestMemstoreRegion(
148           regionsBySize, excludedRegions, false);
149 
150       if (bestAnyRegion == null) {
151         LOG.error("Above memory mark but there are no flushable regions!");
152         return false;
153       }
154 
155       HRegion regionToFlush;
156       if (bestFlushableRegion != null &&
157           bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
158         // Even if it's not supposed to be flushed, pick a region if it's more than twice
159         // as big as the best flushable one - otherwise when we're under pressure we make
160         // lots of little flushes and cause lots of compactions, etc, which just makes
161         // life worse!
162         if (LOG.isDebugEnabled()) {
163           LOG.debug("Under global heap pressure: " +
164             "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
165             "store files, but is " +
166             StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
167             " vs best flushable region's " +
168             StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
169             ". Choosing the bigger.");
170         }
171         regionToFlush = bestAnyRegion;
172       } else {
173         if (bestFlushableRegion == null) {
174           regionToFlush = bestAnyRegion;
175         } else {
176           regionToFlush = bestFlushableRegion;
177         }
178       }
179 
180       Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
181 
182       LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
183       flushedOne = flushRegion(regionToFlush, true);
184       if (!flushedOne) {
185         LOG.info("Excluding unflushable region " + regionToFlush +
186           " - trying to find a different region to flush.");
187         excludedRegions.add(regionToFlush);
188       }
189     }
190     return true;
191   }
192 
193   private class FlushHandler extends HasThread {
194 
195     private FlushHandler(String name) {
196       super(name);
197     }
198 
199     @Override
200     public void run() {
201       while (!server.isStopped()) {
202         FlushQueueEntry fqe = null;
203         try {
204           wakeupPending.set(false); // allow someone to wake us up again
205           fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
206           if (fqe == null || fqe instanceof WakeupFlushThread) {
207             if (isAboveLowWaterMark()) {
208               LOG.debug("Flush thread woke up because memory above low water="
209                   + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
210               if (!flushOneForGlobalPressure()) {
211                 // Wasn't able to flush any region, but we're above low water mark
212                 // This is unlikely to happen, but might happen when closing the
213                 // entire server - another thread is flushing regions. We'll just
214                 // sleep a little bit to avoid spinning, and then pretend that
215                 // we flushed one, so anyone blocked will check again
216                 Thread.sleep(1000);
217                 wakeUpIfBlocking();
218               }
219               // Enqueue another one of these tokens so we'll wake up again
220               wakeupFlushThread();
221             }
222             continue;
223           }
224           FlushRegionEntry fre = (FlushRegionEntry) fqe;
225           if (!flushRegion(fre)) {
226             break;
227           }
228         } catch (InterruptedException ex) {
229           continue;
230         } catch (ConcurrentModificationException ex) {
231           continue;
232         } catch (Exception ex) {
233           LOG.error("Cache flusher failed for entry " + fqe, ex);
234           if (!server.checkFileSystem()) {
235             break;
236           }
237         }
238       }
239       synchronized (regionsInQueue) {
240         regionsInQueue.clear();
241         flushQueue.clear();
242       }
243 
244       // Signal anyone waiting, so they see the close flag
245       wakeUpIfBlocking();
246       LOG.info(getName() + " exiting");
247     }
248   }
249 
250 
251   private void wakeupFlushThread() {
252     if (wakeupPending.compareAndSet(false, true)) {
253       flushQueue.add(new WakeupFlushThread());
254     }
255   }
256 
257   private HRegion getBiggestMemstoreRegion(
258       SortedMap<Long, HRegion> regionsBySize,
259       Set<HRegion> excludedRegions,
260       boolean checkStoreFileCount) {
261     synchronized (regionsInQueue) {
262       for (HRegion region : regionsBySize.values()) {
263         if (excludedRegions.contains(region)) {
264           continue;
265         }
266 
267         if (region.writestate.flushing || !region.writestate.writesEnabled) {
268           continue;
269         }
270 
271         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
272           continue;
273         }
274         return region;
275       }
276     }
277     return null;
278   }
279 
280   /**
281    * Return true if global memory usage is above the high watermark
282    */
283   private boolean isAboveHighWaterMark() {
284     return server.getRegionServerAccounting().
285       getGlobalMemstoreSize() >= globalMemStoreLimit;
286   }
287 
288   /**
289    * Return true if we're above the high watermark
290    */
291   private boolean isAboveLowWaterMark() {
292     return server.getRegionServerAccounting().
293       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
294   }
295 
296   public void requestFlush(HRegion r) {
297     synchronized (regionsInQueue) {
298       if (!regionsInQueue.containsKey(r)) {
299         // This entry has no delay so it will be added at the top of the flush
300         // queue.  It'll come out near immediately.
301         FlushRegionEntry fqe = new FlushRegionEntry(r);
302         this.regionsInQueue.put(r, fqe);
303         this.flushQueue.add(fqe);
304       }
305     }
306   }
307 
308   public void requestDelayedFlush(HRegion r, long delay) {
309     synchronized (regionsInQueue) {
310       if (!regionsInQueue.containsKey(r)) {
311         // This entry has some delay
312         FlushRegionEntry fqe = new FlushRegionEntry(r);
313         fqe.requeue(delay);
314         this.regionsInQueue.put(r, fqe);
315         this.flushQueue.add(fqe);
316       }
317     }
318   }
319 
320   public int getFlushQueueSize() {
321     return flushQueue.size();
322   }
323 
324   /**
325    * Only interrupt once it's done with a run through the work loop.
326    */
327   void interruptIfNecessary() {
328     lock.writeLock().lock();
329     try {
330       for (FlushHandler flushHander : flushHandlers) {
331         if (flushHander != null) flushHander.interrupt();
332       }
333     } finally {
334       lock.writeLock().unlock();
335     }
336   }
337 
338   synchronized void start(UncaughtExceptionHandler eh) {
339     ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
340         server.getServerName().toShortString() + "-MemStoreFlusher", eh);
341     for (int i = 0; i < flushHandlers.length; i++) {
342       flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
343       flusherThreadFactory.newThread(flushHandlers[i]);
344       flushHandlers[i].start();
345     }
346   }
347 
348   boolean isAlive() {
349     for (FlushHandler flushHander : flushHandlers) {
350       if (flushHander != null && flushHander.isAlive()) {
351         return true;
352       }
353     }
354     return false;
355   }
356 
357   void join() {
358     for (FlushHandler flushHander : flushHandlers) {
359       if (flushHander != null) {
360         Threads.shutdown(flushHander.getThread());
361       }
362     }
363   }
364 
365   /*
366    * A flushRegion that checks store file count.  If too many, puts the flush
367    * on delay queue to retry later.
368    * @param fqe
369    * @return true if the region was successfully flushed, false otherwise. If
370    * false, there will be accompanying log messages explaining why the log was
371    * not flushed.
372    */
373   private boolean flushRegion(final FlushRegionEntry fqe) {
374     HRegion region = fqe.region;
375     if (!region.getRegionInfo().isMetaRegion() &&
376         isTooManyStoreFiles(region)) {
377       if (fqe.isMaximumWait(this.blockingWaitTime)) {
378         LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
379           "ms on a compaction to clean up 'too many store files'; waited " +
380           "long enough... proceeding with flush of " +
381           region.getRegionNameAsString());
382       } else {
383         // If this is first time we've been put off, then emit a log message.
384         if (fqe.getRequeueCount() <= 0) {
385           // Note: We don't impose blockingStoreFiles constraint on meta regions
386           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
387             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
388           if (!this.server.compactSplitThread.requestSplit(region)) {
389             try {
390               this.server.compactSplitThread.requestSystemCompaction(
391                   region, Thread.currentThread().getName());
392             } catch (IOException e) {
393                 e = e instanceof RemoteException ?
394                         ((RemoteException)e).unwrapRemoteException() : e;
395             	LOG.error(
396                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
397                 e);
398             }
399           }
400         }
401 
402         // Put back on the queue.  Have it come back out of the queue
403         // after a delay of this.blockingWaitTime / 100 ms.
404         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
405         // Tell a lie, it's not flushed but it's ok
406         return true;
407       }
408     }
409     return flushRegion(region, false);
410   }
411 
412   /*
413    * Flush a region.
414    * @param region Region to flush.
415    * @param emergencyFlush Set if we are being force flushed. If true the region
416    * needs to be removed from the flush queue. If false, when we were called
417    * from the main flusher run loop and we got the entry to flush by calling
418    * poll on the flush queue (which removed it).
419    *
420    * @return true if the region was successfully flushed, false otherwise. If
421    * false, there will be accompanying log messages explaining why the log was
422    * not flushed.
423    */
424   private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
425     long startTime = 0;
426     synchronized (this.regionsInQueue) {
427       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
428       // Use the start time of the FlushRegionEntry if available
429       if (fqe != null) {
430         startTime = fqe.createTime;
431       }
432       if (fqe != null && emergencyFlush) {
433         // Need to remove from region from delay queue.  When NOT an
434         // emergencyFlush, then item was removed via a flushQueue.poll.
435         flushQueue.remove(fqe);
436      }
437     }
438     if (startTime == 0) {
439       // Avoid getting the system time unless we don't have a FlushRegionEntry;
440       // shame we can't capture the time also spent in the above synchronized
441       // block
442       startTime = EnvironmentEdgeManager.currentTime();
443     }
444     lock.readLock().lock();
445     try {
446       notifyFlushRequest(region, emergencyFlush);
447       HRegion.FlushResult flushResult = region.flushcache();
448       boolean shouldCompact = flushResult.isCompactionNeeded();
449       // We just want to check the size
450       boolean shouldSplit = region.checkSplit() != null;
451       if (shouldSplit) {
452         this.server.compactSplitThread.requestSplit(region);
453       } else if (shouldCompact) {
454         server.compactSplitThread.requestSystemCompaction(
455             region, Thread.currentThread().getName());
456       }
457       if (flushResult.isFlushSucceeded()) {
458         long endTime = EnvironmentEdgeManager.currentTime();
459         server.metricsRegionServer.updateFlushTime(endTime - startTime);
460       }
461     } catch (DroppedSnapshotException ex) {
462       // Cache flush can fail in a few places. If it fails in a critical
463       // section, we get a DroppedSnapshotException and a replay of wal
464       // is required. Currently the only way to do this is a restart of
465       // the server. Abort because hdfs is probably bad (HBASE-644 is a case
466       // where hdfs was bad but passed the hdfs check).
467       server.abort("Replay of WAL required. Forcing server shutdown", ex);
468       return false;
469     } catch (IOException ex) {
470       ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
471       LOG.error(
472         "Cache flush failed"
473             + (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName()))
474                 : ""), ex);
475       if (!server.checkFileSystem()) {
476         return false;
477       }
478     } finally {
479       lock.readLock().unlock();
480       wakeUpIfBlocking();
481     }
482     return true;
483   }
484 
485   private void notifyFlushRequest(HRegion region, boolean emergencyFlush) {
486     FlushType type = FlushType.NORMAL;
487     if (emergencyFlush) {
488       type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
489     }
490     for (FlushRequestListener listener : flushRequestListeners) {
491       listener.flushRequested(type, region);
492     }
493   }
494 
495   private void wakeUpIfBlocking() {
496     synchronized (blockSignal) {
497       blockSignal.notifyAll();
498     }
499   }
500 
501   private boolean isTooManyStoreFiles(HRegion region) {
502     for (Store store : region.stores.values()) {
503       if (store.hasTooManyStoreFiles()) {
504         return true;
505       }
506     }
507     return false;
508   }
509 
510   /**
511    * Check if the regionserver's memstore memory usage is greater than the
512    * limit. If so, flush regions with the biggest memstores until we're down
513    * to the lower limit. This method blocks callers until we're down to a safe
514    * amount of memstore consumption.
515    */
516   public void reclaimMemStoreMemory() {
517     TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
518     if (isAboveHighWaterMark()) {
519       if (Trace.isTracing()) {
520         scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
521       }
522       long start = EnvironmentEdgeManager.currentTime();
523       synchronized (this.blockSignal) {
524         boolean blocked = false;
525         long startTime = 0;
526         boolean interrupted = false;
527         try {
528           while (isAboveHighWaterMark() && !server.isStopped()) {
529             if (!blocked) {
530               startTime = EnvironmentEdgeManager.currentTime();
531               LOG.info("Blocking updates on " + server.toString() +
532                 ": the global memstore size " +
533                 StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
534                 " is >= than blocking " +
535                 StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
536             }
537             blocked = true;
538             wakeupFlushThread();
539             try {
540               // we should be able to wait forever, but we've seen a bug where
541               // we miss a notify, so put a 5 second bound on it at least.
542               blockSignal.wait(5 * 1000);
543             } catch (InterruptedException ie) {
544               LOG.warn("Interrupted while waiting");
545               interrupted = true;
546             }
547             long took = EnvironmentEdgeManager.currentTime() - start;
548             LOG.warn("Memstore is above high water mark and block " + took + "ms");
549           }
550         } finally {
551           if (interrupted) {
552             Thread.currentThread().interrupt();
553           }
554         }
555 
556         if(blocked){
557           final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
558           if(totalTime > 0){
559             this.updatesBlockedMsHighWater.add(totalTime);
560           }
561           LOG.info("Unblocking updates for server " + server.toString());
562         }
563       }
564     } else if (isAboveLowWaterMark()) {
565       wakeupFlushThread();
566     }
567     scope.close();
568   }
569   @Override
570   public String toString() {
571     return "flush_queue="
572         + flushQueue.size();
573   }
574 
575   public String dumpQueue() {
576     StringBuilder queueList = new StringBuilder();
577     queueList.append("Flush Queue Queue dump:\n");
578     queueList.append("  Flush Queue:\n");
579     java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
580 
581     while(it.hasNext()){
582       queueList.append("    "+it.next().toString());
583       queueList.append("\n");
584     }
585 
586     return queueList.toString();
587   }
588 
589   /**
590    * Register a MemstoreFlushListener
591    * @param listener
592    */
593   public void registerFlushRequestListener(final FlushRequestListener listener) {
594     this.flushRequestListeners.add(listener);
595   }
596 
597   /**
598    * Unregister the listener from MemstoreFlushListeners
599    * @param listener
600    * @return true when passed listener is unregistered successfully.
601    */
602   public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
603     return this.flushRequestListeners.remove(listener);
604   }
605 
606   /**
607    * Sets the global memstore limit to a new size.
608    * @param globalMemStoreSize
609    */
610   public void setGlobalMemstoreLimit(long globalMemStoreSize) {
611     this.globalMemStoreLimit = globalMemStoreSize;
612     this.globalMemStoreLimitLowMark = 
613         (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
614     reclaimMemStoreMemory();
615   }
616 
617   public long getMemoryLimit() {
618     return this.globalMemStoreLimit;
619   }
620 
621   interface FlushQueueEntry extends Delayed {}
622 
623   /**
624    * Token to insert into the flush queue that ensures that the flusher does not sleep
625    */
626   static class WakeupFlushThread implements FlushQueueEntry {
627     @Override
628     public long getDelay(TimeUnit unit) {
629       return 0;
630     }
631 
632     @Override
633     public int compareTo(Delayed o) {
634       return -1;
635     }
636 
637     @Override
638     public boolean equals(Object obj) {
639       return (this == obj);
640     }
641 
642   }
643 
644   /**
645    * Datastructure used in the flush queue.  Holds region and retry count.
646    * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
647    * construction, the delay is zero. When added to a delay queue, we'll come
648    * out near immediately.  Call {@link #requeue(long)} passing delay in
649    * milliseconds before readding to delay queue if you want it to stay there
650    * a while.
651    */
652   static class FlushRegionEntry implements FlushQueueEntry {
653     private final HRegion region;
654 
655     private final long createTime;
656     private long whenToExpire;
657     private int requeueCount = 0;
658 
659     FlushRegionEntry(final HRegion r) {
660       this.region = r;
661       this.createTime = EnvironmentEdgeManager.currentTime();
662       this.whenToExpire = this.createTime;
663     }
664 
665     /**
666      * @param maximumWait
667      * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
668      */
669     public boolean isMaximumWait(final long maximumWait) {
670       return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
671     }
672 
673     /**
674      * @return Count of times {@link #requeue(long)} was called; i.e this is
675      * number of times we've been requeued.
676      */
677     public int getRequeueCount() {
678       return this.requeueCount;
679     }
680 
681     /**
682      * @param when When to expire, when to come up out of the queue.
683      * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
684      * to whatever you pass.
685      * @return This.
686      */
687     public FlushRegionEntry requeue(final long when) {
688       this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
689       this.requeueCount++;
690       return this;
691     }
692 
693     @Override
694     public long getDelay(TimeUnit unit) {
695       return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
696           TimeUnit.MILLISECONDS);
697     }
698 
699     @Override
700     public int compareTo(Delayed other) {
701       return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
702         other.getDelay(TimeUnit.MILLISECONDS)).intValue();
703     }
704 
705     @Override
706     public String toString() {
707       return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
708     }
709 
710     @Override
711     public int hashCode() {
712       return (int) getDelay(TimeUnit.MILLISECONDS);
713     }
714 
715    @Override
716     public boolean equals(Object obj) {
717       if (this == obj) {
718         return true;
719       }
720       if (obj == null || getClass() != obj.getClass()) {
721         return false;
722       }
723       Delayed other = (Delayed) obj;
724       return compareTo(other) == 0;
725     }
726   }
727 }
728 
729 enum FlushType {
730   NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
731 }