1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.util.StringUtils.humanReadableInt;
22
23 import java.io.IOException;
24 import java.lang.Thread.UncaughtExceptionHandler;
25 import java.lang.management.ManagementFactory;
26 import java.util.ArrayList;
27 import java.util.ConcurrentModificationException;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.SortedMap;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.DelayQueue;
36 import java.util.concurrent.Delayed;
37 import java.util.concurrent.ThreadFactory;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.locks.ReentrantReadWriteLock;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.hbase.DroppedSnapshotException;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.RemoteExceptionHandler;
48 import org.apache.hadoop.hbase.classification.InterfaceAudience;
49 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
50 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
51 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
54 import org.apache.hadoop.hbase.util.HasThread;
55 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
56 import org.apache.hadoop.hbase.util.Threads;
57 import org.apache.hadoop.util.StringUtils;
58 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
59 import org.apache.htrace.Trace;
60 import org.apache.htrace.TraceScope;
61 import org.apache.hadoop.hbase.util.Counter;
62
63 import com.google.common.base.Preconditions;
64
65
66
67
68
69
70
71
72
73
74 @InterfaceAudience.Private
75 class MemStoreFlusher implements FlushRequester {
76 static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
77
78 private Configuration conf;
79
80
81 private final BlockingQueue<FlushQueueEntry> flushQueue =
82 new DelayQueue<FlushQueueEntry>();
83 private final Map<Region, FlushRegionEntry> regionsInQueue =
84 new HashMap<Region, FlushRegionEntry>();
85 private AtomicBoolean wakeupPending = new AtomicBoolean();
86
87 private final long threadWakeFrequency;
88 private final HRegionServer server;
89 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
90 private final Object blockSignal = new Object();
91
92 protected long globalMemStoreLimit;
93 protected float globalMemStoreLimitLowMarkPercent;
94 protected long globalMemStoreLimitLowMark;
95
96 private long blockingWaitTime;
97 private final Counter updatesBlockedMsHighWater = new Counter();
98
99 private final FlushHandler[] flushHandlers;
100 private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);
101
102
103
104
105
106 public MemStoreFlusher(final Configuration conf,
107 final HRegionServer server) {
108 super();
109 this.conf = conf;
110 this.server = server;
111 this.threadWakeFrequency =
112 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
113 long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
114 float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
115 this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
116 this.globalMemStoreLimitLowMarkPercent =
117 HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
118 this.globalMemStoreLimitLowMark =
119 (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
120
121 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
122 90000);
123 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
124 this.flushHandlers = new FlushHandler[handlerCount];
125 LOG.info("globalMemStoreLimit="
126 + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
127 + ", globalMemStoreLimitLowMark="
128 + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
129 + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
130 }
131
132 public Counter getUpdatesBlockedMsHighWater() {
133 return this.updatesBlockedMsHighWater;
134 }
135
136
137
138
139
140
141
142 private boolean flushOneForGlobalPressure() {
143 SortedMap<Long, Region> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize();
144 Set<Region> excludedRegions = new HashSet<Region>();
145
146 double secondaryMultiplier
147 = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
148
149 boolean flushedOne = false;
150 while (!flushedOne) {
151
152
153 Region bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, excludedRegions, true);
154
155 Region bestAnyRegion = getBiggestMemstoreRegion(
156 regionsBySize, excludedRegions, false);
157
158 Region bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize,
159 excludedRegions);
160
161 if (bestAnyRegion == null && bestRegionReplica == null) {
162 LOG.error("Above memory mark but there are no flushable regions!");
163 return false;
164 }
165
166 Region regionToFlush;
167 if (bestFlushableRegion != null &&
168 bestAnyRegion.getMemstoreSize() > 2 * bestFlushableRegion.getMemstoreSize()) {
169
170
171
172
173 if (LOG.isDebugEnabled()) {
174 LOG.debug("Under global heap pressure: " + "Region "
175 + bestAnyRegion.getRegionInfo().getRegionNameAsString()
176 + " has too many " + "store files, but is "
177 + TraditionalBinaryPrefix.long2String(bestAnyRegion.getMemstoreSize(), "", 1)
178 + " vs best flushable region's "
179 + TraditionalBinaryPrefix.long2String(bestFlushableRegion.getMemstoreSize(), "", 1)
180 + ". Choosing the bigger.");
181 }
182 regionToFlush = bestAnyRegion;
183 } else {
184 if (bestFlushableRegion == null) {
185 regionToFlush = bestAnyRegion;
186 } else {
187 regionToFlush = bestFlushableRegion;
188 }
189 }
190
191 Preconditions.checkState(
192 (regionToFlush != null && regionToFlush.getMemstoreSize() > 0) ||
193 (bestRegionReplica != null && bestRegionReplica.getMemstoreSize() > 0));
194
195 if (regionToFlush == null ||
196 (bestRegionReplica != null &&
197 ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
198 (bestRegionReplica.getMemstoreSize()
199 > secondaryMultiplier * regionToFlush.getMemstoreSize()))) {
200 LOG.info("Refreshing storefiles of region " + bestRegionReplica +
201 " due to global heap pressure. memstore size=" + StringUtils.humanReadableInt(
202 server.getRegionServerAccounting().getGlobalMemstoreSize()));
203 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
204 if (!flushedOne) {
205 LOG.info("Excluding secondary region " + bestRegionReplica +
206 " - trying to find a different region to refresh files.");
207 excludedRegions.add(bestRegionReplica);
208 }
209 } else {
210 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
211 + "Total Memstore size="
212 + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
213 + ", Region memstore size="
214 + humanReadableInt(regionToFlush.getMemstoreSize()));
215 flushedOne = flushRegion(regionToFlush, true, true);
216
217 if (!flushedOne) {
218 LOG.info("Excluding unflushable region " + regionToFlush +
219 " - trying to find a different region to flush.");
220 excludedRegions.add(regionToFlush);
221 }
222 }
223 }
224 return true;
225 }
226
227 private class FlushHandler extends HasThread {
228
229 private FlushHandler(String name) {
230 super(name);
231 }
232
233 @Override
234 public void run() {
235 while (!server.isStopped()) {
236 FlushQueueEntry fqe = null;
237 try {
238 wakeupPending.set(false);
239 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
240 if (fqe == null || fqe instanceof WakeupFlushThread) {
241 if (isAboveLowWaterMark()) {
242 LOG.debug("Flush thread woke up because memory above low water="
243 + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
244 if (!flushOneForGlobalPressure()) {
245
246
247
248
249
250 Thread.sleep(1000);
251 wakeUpIfBlocking();
252 }
253
254 wakeupFlushThread();
255 }
256 continue;
257 }
258 FlushRegionEntry fre = (FlushRegionEntry) fqe;
259 if (!flushRegion(fre)) {
260 break;
261 }
262 } catch (InterruptedException ex) {
263 continue;
264 } catch (ConcurrentModificationException ex) {
265 continue;
266 } catch (Exception ex) {
267 LOG.error("Cache flusher failed for entry " + fqe, ex);
268 if (!server.checkFileSystem()) {
269 break;
270 }
271 }
272 }
273 synchronized (regionsInQueue) {
274 regionsInQueue.clear();
275 flushQueue.clear();
276 }
277
278
279 wakeUpIfBlocking();
280 LOG.info(getName() + " exiting");
281 }
282 }
283
284
285 private void wakeupFlushThread() {
286 if (wakeupPending.compareAndSet(false, true)) {
287 flushQueue.add(new WakeupFlushThread());
288 }
289 }
290
291 private Region getBiggestMemstoreRegion(
292 SortedMap<Long, Region> regionsBySize,
293 Set<Region> excludedRegions,
294 boolean checkStoreFileCount) {
295 synchronized (regionsInQueue) {
296 for (Region region : regionsBySize.values()) {
297 if (excludedRegions.contains(region)) {
298 continue;
299 }
300
301 if (((HRegion)region).writestate.flushing ||
302 !((HRegion)region).writestate.writesEnabled) {
303 continue;
304 }
305
306 if (checkStoreFileCount && isTooManyStoreFiles(region)) {
307 continue;
308 }
309 return region;
310 }
311 }
312 return null;
313 }
314
315 private Region getBiggestMemstoreOfRegionReplica(SortedMap<Long, Region> regionsBySize,
316 Set<Region> excludedRegions) {
317 synchronized (regionsInQueue) {
318 for (Region region : regionsBySize.values()) {
319 if (excludedRegions.contains(region)) {
320 continue;
321 }
322
323 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
324 continue;
325 }
326
327 return region;
328 }
329 }
330 return null;
331 }
332
333 private boolean refreshStoreFilesAndReclaimMemory(Region region) {
334 try {
335 return region.refreshStoreFiles();
336 } catch (IOException e) {
337 LOG.warn("Refreshing store files failed with exception", e);
338 }
339 return false;
340 }
341
342
343
344
345 private boolean isAboveHighWaterMark() {
346 return server.getRegionServerAccounting().
347 getGlobalMemstoreSize() >= globalMemStoreLimit;
348 }
349
350
351
352
353 private boolean isAboveLowWaterMark() {
354 return server.getRegionServerAccounting().
355 getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
356 }
357
358 @Override
359 public void requestFlush(Region r, boolean forceFlushAllStores) {
360 synchronized (regionsInQueue) {
361 if (!regionsInQueue.containsKey(r)) {
362
363
364 FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
365 this.regionsInQueue.put(r, fqe);
366 this.flushQueue.add(fqe);
367 }
368 }
369 }
370
371 @Override
372 public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) {
373 synchronized (regionsInQueue) {
374 if (!regionsInQueue.containsKey(r)) {
375
376 FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
377 fqe.requeue(delay);
378 this.regionsInQueue.put(r, fqe);
379 this.flushQueue.add(fqe);
380 }
381 }
382 }
383
384 public int getFlushQueueSize() {
385 return flushQueue.size();
386 }
387
388
389
390
391 void interruptIfNecessary() {
392 lock.writeLock().lock();
393 try {
394 for (FlushHandler flushHander : flushHandlers) {
395 if (flushHander != null) flushHander.interrupt();
396 }
397 } finally {
398 lock.writeLock().unlock();
399 }
400 }
401
402 synchronized void start(UncaughtExceptionHandler eh) {
403 ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
404 server.getServerName().toShortString() + "-MemStoreFlusher", eh);
405 for (int i = 0; i < flushHandlers.length; i++) {
406 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
407 flusherThreadFactory.newThread(flushHandlers[i]);
408 flushHandlers[i].start();
409 }
410 }
411
412 boolean isAlive() {
413 for (FlushHandler flushHander : flushHandlers) {
414 if (flushHander != null && flushHander.isAlive()) {
415 return true;
416 }
417 }
418 return false;
419 }
420
421 void join() {
422 for (FlushHandler flushHander : flushHandlers) {
423 if (flushHander != null) {
424 Threads.shutdown(flushHander.getThread());
425 }
426 }
427 }
428
429
430
431
432
433
434
435
436
437 private boolean flushRegion(final FlushRegionEntry fqe) {
438 Region region = fqe.region;
439 if (!region.getRegionInfo().isMetaRegion() &&
440 isTooManyStoreFiles(region)) {
441 if (fqe.isMaximumWait(this.blockingWaitTime)) {
442 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
443 "ms on a compaction to clean up 'too many store files'; waited " +
444 "long enough... proceeding with flush of " +
445 region.getRegionInfo().getRegionNameAsString());
446 } else {
447
448 if (fqe.getRequeueCount() <= 0) {
449
450 LOG.warn("Region " + region.getRegionInfo().getRegionNameAsString() + " has too many " +
451 "store files; delaying flush up to " + this.blockingWaitTime + "ms");
452 if (!this.server.compactSplitThread.requestSplit(region)) {
453 try {
454 this.server.compactSplitThread.requestSystemCompaction(
455 region, Thread.currentThread().getName());
456 } catch (IOException e) {
457 LOG.error("Cache flush failed for region " +
458 Bytes.toStringBinary(region.getRegionInfo().getRegionName()),
459 RemoteExceptionHandler.checkIOException(e));
460 }
461 }
462 }
463
464
465
466 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
467
468 return true;
469 }
470 }
471 return flushRegion(region, false, fqe.isForceFlushAllStores());
472 }
473
474
475
476
477
478
479
480
481
482
483
484
485
486 private boolean flushRegion(final Region region, final boolean emergencyFlush,
487 boolean forceFlushAllStores) {
488 long startTime = 0;
489 synchronized (this.regionsInQueue) {
490 FlushRegionEntry fqe = this.regionsInQueue.remove(region);
491
492 if (fqe != null) {
493 startTime = fqe.createTime;
494 }
495 if (fqe != null && emergencyFlush) {
496
497
498 flushQueue.remove(fqe);
499 }
500 }
501 if (startTime == 0) {
502
503
504
505 startTime = EnvironmentEdgeManager.currentTime();
506 }
507 lock.readLock().lock();
508 try {
509 notifyFlushRequest(region, emergencyFlush);
510 FlushResult flushResult = region.flush(forceFlushAllStores);
511 boolean shouldCompact = flushResult.isCompactionNeeded();
512
513 boolean shouldSplit = ((HRegion)region).checkSplit() != null;
514 if (shouldSplit) {
515 this.server.compactSplitThread.requestSplit(region);
516 } else if (shouldCompact) {
517 server.compactSplitThread.requestSystemCompaction(
518 region, Thread.currentThread().getName());
519 }
520 if (flushResult.isFlushSucceeded()) {
521 long endTime = EnvironmentEdgeManager.currentTime();
522 server.metricsRegionServer.updateFlushTime(endTime - startTime);
523 }
524 } catch (DroppedSnapshotException ex) {
525
526
527
528
529
530 server.abort("Replay of WAL required. Forcing server shutdown", ex);
531 return false;
532 } catch (IOException ex) {
533 LOG.error("Cache flush failed" + (region != null ? (" for region " +
534 Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
535 RemoteExceptionHandler.checkIOException(ex));
536 if (!server.checkFileSystem()) {
537 return false;
538 }
539 } finally {
540 lock.readLock().unlock();
541 wakeUpIfBlocking();
542 }
543 return true;
544 }
545
546 private void notifyFlushRequest(Region region, boolean emergencyFlush) {
547 FlushType type = FlushType.NORMAL;
548 if (emergencyFlush) {
549 type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
550 }
551 for (FlushRequestListener listener : flushRequestListeners) {
552 listener.flushRequested(type, region);
553 }
554 }
555
556 private void wakeUpIfBlocking() {
557 synchronized (blockSignal) {
558 blockSignal.notifyAll();
559 }
560 }
561
562 private boolean isTooManyStoreFiles(Region region) {
563 for (Store store : region.getStores()) {
564 if (store.hasTooManyStoreFiles()) {
565 return true;
566 }
567 }
568 return false;
569 }
570
571
572
573
574
575
576
577 public void reclaimMemStoreMemory() {
578 TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
579 if (isAboveHighWaterMark()) {
580 if (Trace.isTracing()) {
581 scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
582 }
583 long start = EnvironmentEdgeManager.currentTime();
584 synchronized (this.blockSignal) {
585 boolean blocked = false;
586 long startTime = 0;
587 boolean interrupted = false;
588 try {
589 while (isAboveHighWaterMark() && !server.isStopped()) {
590 if (!blocked) {
591 startTime = EnvironmentEdgeManager.currentTime();
592 LOG.info("Blocking updates on "
593 + server.toString()
594 + ": the global memstore size "
595 + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
596 .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
597 + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
598 }
599 blocked = true;
600 wakeupFlushThread();
601 try {
602
603
604 blockSignal.wait(5 * 1000);
605 } catch (InterruptedException ie) {
606 LOG.warn("Interrupted while waiting");
607 interrupted = true;
608 }
609 long took = EnvironmentEdgeManager.currentTime() - start;
610 LOG.warn("Memstore is above high water mark and block " + took + "ms");
611 }
612 } finally {
613 if (interrupted) {
614 Thread.currentThread().interrupt();
615 }
616 }
617
618 if(blocked){
619 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
620 if(totalTime > 0){
621 this.updatesBlockedMsHighWater.add(totalTime);
622 }
623 LOG.info("Unblocking updates for server " + server.toString());
624 }
625 }
626 } else if (isAboveLowWaterMark()) {
627 wakeupFlushThread();
628 }
629 scope.close();
630 }
631 @Override
632 public String toString() {
633 return "flush_queue="
634 + flushQueue.size();
635 }
636
637 public String dumpQueue() {
638 StringBuilder queueList = new StringBuilder();
639 queueList.append("Flush Queue Queue dump:\n");
640 queueList.append(" Flush Queue:\n");
641 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
642
643 while(it.hasNext()){
644 queueList.append(" "+it.next().toString());
645 queueList.append("\n");
646 }
647
648 return queueList.toString();
649 }
650
651
652
653
654
655 @Override
656 public void registerFlushRequestListener(final FlushRequestListener listener) {
657 this.flushRequestListeners.add(listener);
658 }
659
660
661
662
663
664
665 @Override
666 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
667 return this.flushRequestListeners.remove(listener);
668 }
669
670
671
672
673
674 @Override
675 public void setGlobalMemstoreLimit(long globalMemStoreSize) {
676 this.globalMemStoreLimit = globalMemStoreSize;
677 this.globalMemStoreLimitLowMark =
678 (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
679 reclaimMemStoreMemory();
680 }
681
682 public long getMemoryLimit() {
683 return this.globalMemStoreLimit;
684 }
685
686 interface FlushQueueEntry extends Delayed {
687 }
688
689
690
691
692 static class WakeupFlushThread implements FlushQueueEntry {
693 @Override
694 public long getDelay(TimeUnit unit) {
695 return 0;
696 }
697
698 @Override
699 public int compareTo(Delayed o) {
700 return -1;
701 }
702
703 @Override
704 public boolean equals(Object obj) {
705 return (this == obj);
706 }
707 }
708
709
710
711
712
713
714
715
716
717 static class FlushRegionEntry implements FlushQueueEntry {
718 private final Region region;
719
720 private final long createTime;
721 private long whenToExpire;
722 private int requeueCount = 0;
723
724 private boolean forceFlushAllStores;
725
726 FlushRegionEntry(final Region r, boolean forceFlushAllStores) {
727 this.region = r;
728 this.createTime = EnvironmentEdgeManager.currentTime();
729 this.whenToExpire = this.createTime;
730 this.forceFlushAllStores = forceFlushAllStores;
731 }
732
733
734
735
736
737 public boolean isMaximumWait(final long maximumWait) {
738 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
739 }
740
741
742
743
744
745 public int getRequeueCount() {
746 return this.requeueCount;
747 }
748
749
750
751
752 public boolean isForceFlushAllStores() {
753 return forceFlushAllStores;
754 }
755
756
757
758
759
760
761
762 public FlushRegionEntry requeue(final long when) {
763 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
764 this.requeueCount++;
765 return this;
766 }
767
768 @Override
769 public long getDelay(TimeUnit unit) {
770 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
771 TimeUnit.MILLISECONDS);
772 }
773
774 @Override
775 public int compareTo(Delayed other) {
776
777 int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
778 other.getDelay(TimeUnit.MILLISECONDS)).intValue();
779 if (ret != 0) {
780 return ret;
781 }
782 FlushQueueEntry otherEntry = (FlushQueueEntry) other;
783 return hashCode() - otherEntry.hashCode();
784 }
785
786 @Override
787 public String toString() {
788 return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]";
789 }
790
791 @Override
792 public int hashCode() {
793 int hash = (int) getDelay(TimeUnit.MILLISECONDS);
794 return hash ^ region.hashCode();
795 }
796
797 @Override
798 public boolean equals(Object obj) {
799 if (this == obj) {
800 return true;
801 }
802 if (obj == null || getClass() != obj.getClass()) {
803 return false;
804 }
805 Delayed other = (Delayed) obj;
806 return compareTo(other) == 0;
807 }
808 }
809 }
810
811 enum FlushType {
812 NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
813 }