View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.lang.Thread.UncaughtExceptionHandler;
23  import java.lang.management.ManagementFactory;
24  import java.util.ArrayList;
25  import java.util.ConcurrentModificationException;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.SortedMap;
32  import java.util.concurrent.BlockingQueue;
33  import java.util.concurrent.DelayQueue;
34  import java.util.concurrent.Delayed;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.locks.ReentrantReadWriteLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.hbase.DroppedSnapshotException;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.Counter;
49  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50  import org.apache.hadoop.hbase.util.HasThread;
51  import org.apache.hadoop.hbase.util.Threads;
52  import org.apache.hadoop.ipc.RemoteException;
53  import org.apache.hadoop.util.StringUtils;
54  import org.htrace.Trace;
55  import org.htrace.TraceScope;
56  
57  import com.google.common.base.Preconditions;
58  
59  /**
60   * Thread that flushes cache on request
61   *
62   * NOTE: This class extends Thread rather than Chore because the sleep time
63   * can be interrupted when there is something to do, rather than the Chore
64   * sleep time which is invariant.
65   *
66   * @see FlushRequester
67   */
68  @InterfaceAudience.Private
69  class MemStoreFlusher implements FlushRequester {
70    static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
71  
72    // These two data members go together.  Any entry in the one must have
73    // a corresponding entry in the other.
74    private final BlockingQueue<FlushQueueEntry> flushQueue =
75      new DelayQueue<FlushQueueEntry>();
76    private final Map<HRegion, FlushRegionEntry> regionsInQueue =
77      new HashMap<HRegion, FlushRegionEntry>();
78    private AtomicBoolean wakeupPending = new AtomicBoolean();
79  
80    private final long threadWakeFrequency;
81    private final HRegionServer server;
82    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
83    private final Object blockSignal = new Object();
84  
85    protected long globalMemStoreLimit;
86    protected float globalMemStoreLimitLowMarkPercent;
87    protected long globalMemStoreLimitLowMark;
88  
89    private long blockingWaitTime;
90    private final Counter updatesBlockedMsHighWater = new Counter();
91  
92    private final FlushHandler[] flushHandlers;
93    private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);
94  
95    /**
96     * @param conf
97     * @param server
98     */
99    public MemStoreFlusher(final Configuration conf,
100       final HRegionServer server) {
101     super();
102     this.server = server;
103     this.threadWakeFrequency =
104       conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
105     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
106     float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
107     this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
108     this.globalMemStoreLimitLowMarkPercent = 
109         HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
110     this.globalMemStoreLimitLowMark = 
111         (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
112 
113     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
114       90000);
115     int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
116     this.flushHandlers = new FlushHandler[handlerCount];
117     LOG.info("globalMemStoreLimit=" +
118       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
119       ", globalMemStoreLimitLowMark=" +
120       StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
121       ", maxHeap=" + StringUtils.humanReadableInt(max));
122   }
123 
124   public Counter getUpdatesBlockedMsHighWater() {
125     return this.updatesBlockedMsHighWater;
126   }
127 
128   /**
129    * The memstore across all regions has exceeded the low water mark. Pick
130    * one region to flush and flush it synchronously (this is called from the
131    * flush thread)
132    * @return true if successful
133    */
134   private boolean flushOneForGlobalPressure() {
135     SortedMap<Long, HRegion> regionsBySize =
136         server.getCopyOfOnlineRegionsSortedBySize();
137 
138     Set<HRegion> excludedRegions = new HashSet<HRegion>();
139 
140     boolean flushedOne = false;
141     while (!flushedOne) {
142       // Find the biggest region that doesn't have too many storefiles
143       // (might be null!)
144       HRegion bestFlushableRegion = getBiggestMemstoreRegion(
145           regionsBySize, excludedRegions, true);
146       // Find the biggest region, total, even if it might have too many flushes.
147       HRegion bestAnyRegion = getBiggestMemstoreRegion(
148           regionsBySize, excludedRegions, false);
149 
150       if (bestAnyRegion == null) {
151         LOG.error("Above memory mark but there are no flushable regions!");
152         return false;
153       }
154 
155       HRegion regionToFlush;
156       if (bestFlushableRegion != null &&
157           bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
158         // Even if it's not supposed to be flushed, pick a region if it's more than twice
159         // as big as the best flushable one - otherwise when we're under pressure we make
160         // lots of little flushes and cause lots of compactions, etc, which just makes
161         // life worse!
162         if (LOG.isDebugEnabled()) {
163           LOG.debug("Under global heap pressure: " +
164             "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
165             "store files, but is " +
166             StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
167             " vs best flushable region's " +
168             StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
169             ". Choosing the bigger.");
170         }
171         regionToFlush = bestAnyRegion;
172       } else {
173         if (bestFlushableRegion == null) {
174           regionToFlush = bestAnyRegion;
175         } else {
176           regionToFlush = bestFlushableRegion;
177         }
178       }
179 
180       Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
181 
182       LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
183       flushedOne = flushRegion(regionToFlush, true);
184       if (!flushedOne) {
185         LOG.info("Excluding unflushable region " + regionToFlush +
186           " - trying to find a different region to flush.");
187         excludedRegions.add(regionToFlush);
188       }
189     }
190     return true;
191   }
192 
193   private class FlushHandler extends HasThread {
194 
195     private FlushHandler(String name) {
196       super(name);
197     }
198 
199     @Override
200     public void run() {
201       while (!server.isStopped()) {
202         FlushQueueEntry fqe = null;
203         try {
204           wakeupPending.set(false); // allow someone to wake us up again
205           fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
206           if (fqe == null || fqe instanceof WakeupFlushThread) {
207             if (isAboveLowWaterMark()) {
208               LOG.debug("Flush thread woke up because memory above low water="
209                   + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
210               if (!flushOneForGlobalPressure()) {
211                 // Wasn't able to flush any region, but we're above low water mark
212                 // This is unlikely to happen, but might happen when closing the
213                 // entire server - another thread is flushing regions. We'll just
214                 // sleep a little bit to avoid spinning, and then pretend that
215                 // we flushed one, so anyone blocked will check again
216                 Thread.sleep(1000);
217                 wakeUpIfBlocking();
218               }
219               // Enqueue another one of these tokens so we'll wake up again
220               wakeupFlushThread();
221             }
222             continue;
223           }
224           FlushRegionEntry fre = (FlushRegionEntry) fqe;
225           if (!flushRegion(fre)) {
226             break;
227           }
228         } catch (InterruptedException ex) {
229           continue;
230         } catch (ConcurrentModificationException ex) {
231           continue;
232         } catch (Exception ex) {
233           LOG.error("Cache flusher failed for entry " + fqe, ex);
234           if (!server.checkFileSystem()) {
235             break;
236           }
237         }
238       }
239       synchronized (regionsInQueue) {
240         regionsInQueue.clear();
241         flushQueue.clear();
242       }
243 
244       // Signal anyone waiting, so they see the close flag
245       wakeUpIfBlocking();
246       LOG.info(getName() + " exiting");
247     }
248   }
249 
250 
251   private void wakeupFlushThread() {
252     if (wakeupPending.compareAndSet(false, true)) {
253       flushQueue.add(new WakeupFlushThread());
254     }
255   }
256 
257   private HRegion getBiggestMemstoreRegion(
258       SortedMap<Long, HRegion> regionsBySize,
259       Set<HRegion> excludedRegions,
260       boolean checkStoreFileCount) {
261     synchronized (regionsInQueue) {
262       for (HRegion region : regionsBySize.values()) {
263         if (excludedRegions.contains(region)) {
264           continue;
265         }
266 
267         if (region.writestate.flushing || !region.writestate.writesEnabled) {
268           continue;
269         }
270 
271         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
272           continue;
273         }
274         return region;
275       }
276     }
277     return null;
278   }
279 
280   /**
281    * Return true if global memory usage is above the high watermark
282    */
283   private boolean isAboveHighWaterMark() {
284     return server.getRegionServerAccounting().
285       getGlobalMemstoreSize() >= globalMemStoreLimit;
286   }
287 
288   /**
289    * Return true if we're above the high watermark
290    */
291   private boolean isAboveLowWaterMark() {
292     return server.getRegionServerAccounting().
293       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
294   }
295 
296   public void requestFlush(HRegion r) {
297     synchronized (regionsInQueue) {
298       if (!regionsInQueue.containsKey(r)) {
299         // This entry has no delay so it will be added at the top of the flush
300         // queue.  It'll come out near immediately.
301         FlushRegionEntry fqe = new FlushRegionEntry(r);
302         this.regionsInQueue.put(r, fqe);
303         this.flushQueue.add(fqe);
304       }
305     }
306   }
307 
308   public void requestDelayedFlush(HRegion r, long delay) {
309     synchronized (regionsInQueue) {
310       if (!regionsInQueue.containsKey(r)) {
311         // This entry has some delay
312         FlushRegionEntry fqe = new FlushRegionEntry(r);
313         fqe.requeue(delay);
314         this.regionsInQueue.put(r, fqe);
315         this.flushQueue.add(fqe);
316       }
317     }
318   }
319 
320   public int getFlushQueueSize() {
321     return flushQueue.size();
322   }
323 
324   /**
325    * Only interrupt once it's done with a run through the work loop.
326    */
327   void interruptIfNecessary() {
328     lock.writeLock().lock();
329     try {
330       for (FlushHandler flushHander : flushHandlers) {
331         if (flushHander != null) flushHander.interrupt();
332       }
333     } finally {
334       lock.writeLock().unlock();
335     }
336   }
337 
338   synchronized void start(UncaughtExceptionHandler eh) {
339     ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
340         server.getServerName().toShortString() + "-MemStoreFlusher", eh);
341     for (int i = 0; i < flushHandlers.length; i++) {
342       flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
343       flusherThreadFactory.newThread(flushHandlers[i]);
344       flushHandlers[i].start();
345     }
346   }
347 
348   boolean isAlive() {
349     for (FlushHandler flushHander : flushHandlers) {
350       if (flushHander != null && flushHander.isAlive()) {
351         return true;
352       }
353     }
354     return false;
355   }
356 
357   void join() {
358     for (FlushHandler flushHander : flushHandlers) {
359       if (flushHander != null) {
360         Threads.shutdown(flushHander.getThread());
361       }
362     }
363   }
364 
365   /*
366    * A flushRegion that checks store file count.  If too many, puts the flush
367    * on delay queue to retry later.
368    * @param fqe
369    * @return true if the region was successfully flushed, false otherwise. If
370    * false, there will be accompanying log messages explaining why the log was
371    * not flushed.
372    */
373   private boolean flushRegion(final FlushRegionEntry fqe) {
374     HRegion region = fqe.region;
375     if (!region.getRegionInfo().isMetaRegion() &&
376         isTooManyStoreFiles(region)) {
377       if (fqe.isMaximumWait(this.blockingWaitTime)) {
378         LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
379           "ms on a compaction to clean up 'too many store files'; waited " +
380           "long enough... proceeding with flush of " +
381           region.getRegionNameAsString());
382       } else {
383         // If this is first time we've been put off, then emit a log message.
384         if (fqe.getRequeueCount() <= 0) {
385           // Note: We don't impose blockingStoreFiles constraint on meta regions
386           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
387             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
388           if (!this.server.compactSplitThread.requestSplit(region)) {
389             try {
390               this.server.compactSplitThread.requestSystemCompaction(
391                   region, Thread.currentThread().getName());
392             } catch (IOException e) {
393                 e = e instanceof RemoteException ?
394                         ((RemoteException)e).unwrapRemoteException() : e;
395             	LOG.error(
396                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
397                 e);
398             }
399           }
400         }
401 
402         // Put back on the queue.  Have it come back out of the queue
403         // after a delay of this.blockingWaitTime / 100 ms.
404         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
405         // Tell a lie, it's not flushed but it's ok
406         return true;
407       }
408     }
409     return flushRegion(region, false);
410   }
411 
412   /*
413    * Flush a region.
414    * @param region Region to flush.
415    * @param emergencyFlush Set if we are being force flushed. If true the region
416    * needs to be removed from the flush queue. If false, when we were called
417    * from the main flusher run loop and we got the entry to flush by calling
418    * poll on the flush queue (which removed it).
419    *
420    * @return true if the region was successfully flushed, false otherwise. If
421    * false, there will be accompanying log messages explaining why the log was
422    * not flushed.
423    */
424   private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
425     synchronized (this.regionsInQueue) {
426       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
427       if (fqe != null && emergencyFlush) {
428         // Need to remove from region from delay queue.  When NOT an
429         // emergencyFlush, then item was removed via a flushQueue.poll.
430         flushQueue.remove(fqe);
431      }
432     }
433     lock.readLock().lock();
434     try {
435       notifyFlushRequest(region, emergencyFlush);
436       boolean shouldCompact = region.flushcache().isCompactionNeeded();
437       // We just want to check the size
438       boolean shouldSplit = region.checkSplit() != null;
439       if (shouldSplit) {
440         this.server.compactSplitThread.requestSplit(region);
441       } else if (shouldCompact) {
442         server.compactSplitThread.requestSystemCompaction(
443             region, Thread.currentThread().getName());
444       }
445 
446     } catch (DroppedSnapshotException ex) {
447       // Cache flush can fail in a few places. If it fails in a critical
448       // section, we get a DroppedSnapshotException and a replay of hlog
449       // is required. Currently the only way to do this is a restart of
450       // the server. Abort because hdfs is probably bad (HBASE-644 is a case
451       // where hdfs was bad but passed the hdfs check).
452       server.abort("Replay of HLog required. Forcing server shutdown", ex);
453       return false;
454     } catch (IOException ex) {
455       ex = ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
456       LOG.error(
457         "Cache flush failed"
458             + (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName()))
459                 : ""), ex);
460       if (!server.checkFileSystem()) {
461         return false;
462       }
463     } finally {
464       lock.readLock().unlock();
465       wakeUpIfBlocking();
466     }
467     return true;
468   }
469 
470   private void notifyFlushRequest(HRegion region, boolean emergencyFlush) {
471     FlushType type = FlushType.NORMAL;
472     if (emergencyFlush) {
473       type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
474     }
475     for (FlushRequestListener listener : flushRequestListeners) {
476       listener.flushRequested(type, region);
477     }
478   }
479 
480   private void wakeUpIfBlocking() {
481     synchronized (blockSignal) {
482       blockSignal.notifyAll();
483     }
484   }
485 
486   private boolean isTooManyStoreFiles(HRegion region) {
487     for (Store store : region.stores.values()) {
488       if (store.hasTooManyStoreFiles()) {
489         return true;
490       }
491     }
492     return false;
493   }
494 
495   /**
496    * Check if the regionserver's memstore memory usage is greater than the
497    * limit. If so, flush regions with the biggest memstores until we're down
498    * to the lower limit. This method blocks callers until we're down to a safe
499    * amount of memstore consumption.
500    */
501   public void reclaimMemStoreMemory() {
502     TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
503     if (isAboveHighWaterMark()) {
504       if (Trace.isTracing()) {
505         scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
506       }
507       long start = EnvironmentEdgeManager.currentTime();
508       synchronized (this.blockSignal) {
509         boolean blocked = false;
510         long startTime = 0;
511         boolean interrupted = false;
512         try {
513           while (isAboveHighWaterMark() && !server.isStopped()) {
514             if (!blocked) {
515               startTime = EnvironmentEdgeManager.currentTime();
516               LOG.info("Blocking updates on " + server.toString() +
517                 ": the global memstore size " +
518                 StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
519                 " is >= than blocking " +
520                 StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
521             }
522             blocked = true;
523             wakeupFlushThread();
524             try {
525               // we should be able to wait forever, but we've seen a bug where
526               // we miss a notify, so put a 5 second bound on it at least.
527               blockSignal.wait(5 * 1000);
528             } catch (InterruptedException ie) {
529               LOG.warn("Interrupted while waiting");
530               interrupted = true;
531             }
532             long took = EnvironmentEdgeManager.currentTime() - start;
533             LOG.warn("Memstore is above high water mark and block " + took + "ms");
534           }
535         } finally {
536           if (interrupted) {
537             Thread.currentThread().interrupt();
538           }
539         }
540 
541         if(blocked){
542           final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
543           if(totalTime > 0){
544             this.updatesBlockedMsHighWater.add(totalTime);
545           }
546           LOG.info("Unblocking updates for server " + server.toString());
547         }
548       }
549     } else if (isAboveLowWaterMark()) {
550       wakeupFlushThread();
551     }
552     scope.close();
553   }
554   @Override
555   public String toString() {
556     return "flush_queue="
557         + flushQueue.size();
558   }
559 
560   public String dumpQueue() {
561     StringBuilder queueList = new StringBuilder();
562     queueList.append("Flush Queue Queue dump:\n");
563     queueList.append("  Flush Queue:\n");
564     java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
565 
566     while(it.hasNext()){
567       queueList.append("    "+it.next().toString());
568       queueList.append("\n");
569     }
570 
571     return queueList.toString();
572   }
573 
574   /**
575    * Register a MemstoreFlushListener
576    * @param listener
577    */
578   public void registerFlushRequestListener(final FlushRequestListener listener) {
579     this.flushRequestListeners.add(listener);
580   }
581 
582   /**
583    * Unregister the listener from MemstoreFlushListeners
584    * @param listener
585    * @return true when passed listener is unregistered successfully.
586    */
587   public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
588     return this.flushRequestListeners.remove(listener);
589   }
590 
591   /**
592    * Sets the global memstore limit to a new size.
593    * @param globalMemStoreSize
594    */
595   public void setGlobalMemstoreLimit(long globalMemStoreSize) {
596     this.globalMemStoreLimit = globalMemStoreSize;
597     this.globalMemStoreLimitLowMark = 
598         (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
599     reclaimMemStoreMemory();
600   }
601 
602   public long getMemoryLimit() {
603     return this.globalMemStoreLimit;
604   }
605 
606   interface FlushQueueEntry extends Delayed {}
607 
608   /**
609    * Token to insert into the flush queue that ensures that the flusher does not sleep
610    */
611   static class WakeupFlushThread implements FlushQueueEntry {
612     @Override
613     public long getDelay(TimeUnit unit) {
614       return 0;
615     }
616 
617     @Override
618     public int compareTo(Delayed o) {
619       return -1;
620     }
621 
622     @Override
623     public boolean equals(Object obj) {
624       return (this == obj);
625     }
626 
627   }
628 
629   /**
630    * Datastructure used in the flush queue.  Holds region and retry count.
631    * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
632    * construction, the delay is zero. When added to a delay queue, we'll come
633    * out near immediately.  Call {@link #requeue(long)} passing delay in
634    * milliseconds before readding to delay queue if you want it to stay there
635    * a while.
636    */
637   static class FlushRegionEntry implements FlushQueueEntry {
638     private final HRegion region;
639 
640     private final long createTime;
641     private long whenToExpire;
642     private int requeueCount = 0;
643 
644     FlushRegionEntry(final HRegion r) {
645       this.region = r;
646       this.createTime = EnvironmentEdgeManager.currentTime();
647       this.whenToExpire = this.createTime;
648     }
649 
650     /**
651      * @param maximumWait
652      * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
653      */
654     public boolean isMaximumWait(final long maximumWait) {
655       return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
656     }
657 
658     /**
659      * @return Count of times {@link #requeue(long)} was called; i.e this is
660      * number of times we've been requeued.
661      */
662     public int getRequeueCount() {
663       return this.requeueCount;
664     }
665 
666     /**
667      * @param when When to expire, when to come up out of the queue.
668      * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
669      * to whatever you pass.
670      * @return This.
671      */
672     public FlushRegionEntry requeue(final long when) {
673       this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
674       this.requeueCount++;
675       return this;
676     }
677 
678     @Override
679     public long getDelay(TimeUnit unit) {
680       return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
681           TimeUnit.MILLISECONDS);
682     }
683 
684     @Override
685     public int compareTo(Delayed other) {
686       return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
687         other.getDelay(TimeUnit.MILLISECONDS)).intValue();
688     }
689 
690     @Override
691     public String toString() {
692       return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
693     }
694 
695     @Override
696     public int hashCode() {
697       return (int) getDelay(TimeUnit.MILLISECONDS);
698     }
699 
700    @Override
701     public boolean equals(Object obj) {
702       if (this == obj) {
703         return true;
704       }
705       if (obj == null || getClass() != obj.getClass()) {
706         return false;
707       }
708       Delayed other = (Delayed) obj;
709       return compareTo(other) == 0;
710     }
711   }
712 }
713 
714 enum FlushType {
715   NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
716 }