1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.util.StringUtils.humanReadableInt;
22
23 import java.io.IOException;
24 import java.lang.Thread.UncaughtExceptionHandler;
25 import java.lang.management.MemoryUsage;
26 import java.util.ArrayList;
27 import java.util.ConcurrentModificationException;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.SortedMap;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.DelayQueue;
36 import java.util.concurrent.Delayed;
37 import java.util.concurrent.ThreadFactory;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.locks.ReentrantReadWriteLock;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.hbase.DroppedSnapshotException;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.RemoteExceptionHandler;
48 import org.apache.hadoop.hbase.classification.InterfaceAudience;
49 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
50 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
51 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
54 import org.apache.hadoop.hbase.util.HasThread;
55 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
56 import org.apache.hadoop.hbase.util.Threads;
57 import org.apache.hadoop.util.StringUtils;
58 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
59 import org.apache.htrace.Trace;
60 import org.apache.htrace.TraceScope;
61 import org.apache.hadoop.hbase.util.Counter;
62
63 import com.google.common.base.Preconditions;
64
65
66
67
68
69
70
71
72
73
74 @InterfaceAudience.Private
75 class MemStoreFlusher implements FlushRequester {
76 private static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
77
78 private Configuration conf;
79
80
81 private final BlockingQueue<FlushQueueEntry> flushQueue =
82 new DelayQueue<FlushQueueEntry>();
83 private final Map<Region, FlushRegionEntry> regionsInQueue =
84 new HashMap<Region, FlushRegionEntry>();
85 private AtomicBoolean wakeupPending = new AtomicBoolean();
86
87 private final long threadWakeFrequency;
88 private final HRegionServer server;
89 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
90 private final Object blockSignal = new Object();
91
92 protected long globalMemStoreLimit;
93 protected float globalMemStoreLimitLowMarkPercent;
94 protected long globalMemStoreLimitLowMark;
95
96 private long blockingWaitTime;
97 private final Counter updatesBlockedMsHighWater = new Counter();
98
99 private final FlushHandler[] flushHandlers;
100 private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);
101
102
103
104
105
106 public MemStoreFlusher(final Configuration conf,
107 final HRegionServer server) {
108 super();
109 this.conf = conf;
110 this.server = server;
111 this.threadWakeFrequency =
112 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
113 long max = -1L;
114 final MemoryUsage usage = HeapMemorySizeUtil.safeGetHeapMemoryUsage();
115 if (usage != null) {
116 max = usage.getMax();
117 }
118 float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
119 this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
120 this.globalMemStoreLimitLowMarkPercent =
121 HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
122 this.globalMemStoreLimitLowMark =
123 (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
124
125 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
126 90000);
127 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
128 this.flushHandlers = new FlushHandler[handlerCount];
129 LOG.info("globalMemStoreLimit="
130 + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
131 + ", globalMemStoreLimitLowMark="
132 + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
133 + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
134 }
135
136 public Counter getUpdatesBlockedMsHighWater() {
137 return this.updatesBlockedMsHighWater;
138 }
139
140
141
142
143
144
145
146 private boolean flushOneForGlobalPressure() {
147 SortedMap<Long, Region> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize();
148 Set<Region> excludedRegions = new HashSet<Region>();
149
150 double secondaryMultiplier
151 = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
152
153 boolean flushedOne = false;
154 while (!flushedOne) {
155
156
157 Region bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, excludedRegions, true);
158
159 Region bestAnyRegion = getBiggestMemstoreRegion(
160 regionsBySize, excludedRegions, false);
161
162 Region bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize,
163 excludedRegions);
164
165 if (bestAnyRegion == null && bestRegionReplica == null) {
166 LOG.error("Above memory mark but there are no flushable regions!");
167 return false;
168 }
169
170 Region regionToFlush;
171 if (bestFlushableRegion != null &&
172 bestAnyRegion.getMemstoreSize() > 2 * bestFlushableRegion.getMemstoreSize()) {
173
174
175
176
177 if (LOG.isDebugEnabled()) {
178 LOG.debug("Under global heap pressure: " + "Region "
179 + bestAnyRegion.getRegionInfo().getRegionNameAsString()
180 + " has too many " + "store files, but is "
181 + TraditionalBinaryPrefix.long2String(bestAnyRegion.getMemstoreSize(), "", 1)
182 + " vs best flushable region's "
183 + TraditionalBinaryPrefix.long2String(bestFlushableRegion.getMemstoreSize(), "", 1)
184 + ". Choosing the bigger.");
185 }
186 regionToFlush = bestAnyRegion;
187 } else {
188 if (bestFlushableRegion == null) {
189 regionToFlush = bestAnyRegion;
190 } else {
191 regionToFlush = bestFlushableRegion;
192 }
193 }
194
195 Preconditions.checkState(
196 (regionToFlush != null && regionToFlush.getMemstoreSize() > 0) ||
197 (bestRegionReplica != null && bestRegionReplica.getMemstoreSize() > 0));
198
199 if (regionToFlush == null ||
200 (bestRegionReplica != null &&
201 ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
202 (bestRegionReplica.getMemstoreSize()
203 > secondaryMultiplier * regionToFlush.getMemstoreSize()))) {
204 LOG.info("Refreshing storefiles of region " + bestRegionReplica +
205 " due to global heap pressure. memstore size=" + StringUtils.humanReadableInt(
206 server.getRegionServerAccounting().getGlobalMemstoreSize()));
207 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
208 if (!flushedOne) {
209 LOG.info("Excluding secondary region " + bestRegionReplica +
210 " - trying to find a different region to refresh files.");
211 excludedRegions.add(bestRegionReplica);
212 }
213 } else {
214 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
215 + "Total Memstore size="
216 + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
217 + ", Region memstore size="
218 + humanReadableInt(regionToFlush.getMemstoreSize()));
219 flushedOne = flushRegion(regionToFlush, true, true);
220
221 if (!flushedOne) {
222 LOG.info("Excluding unflushable region " + regionToFlush +
223 " - trying to find a different region to flush.");
224 excludedRegions.add(regionToFlush);
225 }
226 }
227 }
228 return true;
229 }
230
231 private class FlushHandler extends HasThread {
232
233 private FlushHandler(String name) {
234 super(name);
235 }
236
237 @Override
238 public void run() {
239 while (!server.isStopped()) {
240 FlushQueueEntry fqe = null;
241 try {
242 wakeupPending.set(false);
243 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
244 if (fqe == null || fqe instanceof WakeupFlushThread) {
245 if (isAboveLowWaterMark()) {
246 LOG.debug("Flush thread woke up because memory above low water="
247 + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
248 if (!flushOneForGlobalPressure()) {
249
250
251
252
253
254 Thread.sleep(1000);
255 wakeUpIfBlocking();
256 }
257
258 wakeupFlushThread();
259 }
260 continue;
261 }
262 FlushRegionEntry fre = (FlushRegionEntry) fqe;
263 if (!flushRegion(fre)) {
264 break;
265 }
266 } catch (InterruptedException ex) {
267 continue;
268 } catch (ConcurrentModificationException ex) {
269 continue;
270 } catch (Exception ex) {
271 LOG.error("Cache flusher failed for entry " + fqe, ex);
272 if (!server.checkFileSystem()) {
273 break;
274 }
275 }
276 }
277 synchronized (regionsInQueue) {
278 regionsInQueue.clear();
279 flushQueue.clear();
280 }
281
282
283 wakeUpIfBlocking();
284 LOG.info(getName() + " exiting");
285 }
286 }
287
288
289 private void wakeupFlushThread() {
290 if (wakeupPending.compareAndSet(false, true)) {
291 flushQueue.add(new WakeupFlushThread());
292 }
293 }
294
295 private Region getBiggestMemstoreRegion(
296 SortedMap<Long, Region> regionsBySize,
297 Set<Region> excludedRegions,
298 boolean checkStoreFileCount) {
299 synchronized (regionsInQueue) {
300 for (Region region : regionsBySize.values()) {
301 if (excludedRegions.contains(region)) {
302 continue;
303 }
304
305 if (((HRegion)region).writestate.flushing ||
306 !((HRegion)region).writestate.writesEnabled) {
307 continue;
308 }
309
310 if (checkStoreFileCount && isTooManyStoreFiles(region)) {
311 continue;
312 }
313 return region;
314 }
315 }
316 return null;
317 }
318
319 private Region getBiggestMemstoreOfRegionReplica(SortedMap<Long, Region> regionsBySize,
320 Set<Region> excludedRegions) {
321 synchronized (regionsInQueue) {
322 for (Region region : regionsBySize.values()) {
323 if (excludedRegions.contains(region)) {
324 continue;
325 }
326
327 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
328 continue;
329 }
330
331 return region;
332 }
333 }
334 return null;
335 }
336
337 private boolean refreshStoreFilesAndReclaimMemory(Region region) {
338 try {
339 return region.refreshStoreFiles();
340 } catch (IOException e) {
341 LOG.warn("Refreshing store files failed with exception", e);
342 }
343 return false;
344 }
345
346
347
348
349 private boolean isAboveHighWaterMark() {
350 return server.getRegionServerAccounting().
351 getGlobalMemstoreSize() >= globalMemStoreLimit;
352 }
353
354
355
356
357 private boolean isAboveLowWaterMark() {
358 return server.getRegionServerAccounting().
359 getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
360 }
361
362 @Override
363 public void requestFlush(Region r, boolean forceFlushAllStores) {
364 synchronized (regionsInQueue) {
365 if (!regionsInQueue.containsKey(r)) {
366
367
368 FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
369 this.regionsInQueue.put(r, fqe);
370 this.flushQueue.add(fqe);
371 }
372 }
373 }
374
375 @Override
376 public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) {
377 synchronized (regionsInQueue) {
378 if (!regionsInQueue.containsKey(r)) {
379
380 FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
381 fqe.requeue(delay);
382 this.regionsInQueue.put(r, fqe);
383 this.flushQueue.add(fqe);
384 }
385 }
386 }
387
388 public int getFlushQueueSize() {
389 return flushQueue.size();
390 }
391
392
393
394
395 void interruptIfNecessary() {
396 lock.writeLock().lock();
397 try {
398 for (FlushHandler flushHander : flushHandlers) {
399 if (flushHander != null) flushHander.interrupt();
400 }
401 } finally {
402 lock.writeLock().unlock();
403 }
404 }
405
406 synchronized void start(UncaughtExceptionHandler eh) {
407 ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
408 server.getServerName().toShortString() + "-MemStoreFlusher", eh);
409 for (int i = 0; i < flushHandlers.length; i++) {
410 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
411 flusherThreadFactory.newThread(flushHandlers[i]);
412 flushHandlers[i].start();
413 }
414 }
415
416 boolean isAlive() {
417 for (FlushHandler flushHander : flushHandlers) {
418 if (flushHander != null && flushHander.isAlive()) {
419 return true;
420 }
421 }
422 return false;
423 }
424
425 void join() {
426 for (FlushHandler flushHander : flushHandlers) {
427 if (flushHander != null) {
428 Threads.shutdown(flushHander.getThread());
429 }
430 }
431 }
432
433
434
435
436
437
438
439
440
441 private boolean flushRegion(final FlushRegionEntry fqe) {
442 Region region = fqe.region;
443 if (!region.getRegionInfo().isMetaRegion() &&
444 isTooManyStoreFiles(region)) {
445 if (fqe.isMaximumWait(this.blockingWaitTime)) {
446 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
447 "ms on a compaction to clean up 'too many store files'; waited " +
448 "long enough... proceeding with flush of " +
449 region.getRegionInfo().getRegionNameAsString());
450 } else {
451
452 if (fqe.getRequeueCount() <= 0) {
453
454 LOG.warn("Region " + region.getRegionInfo().getRegionNameAsString() + " has too many " +
455 "store files; delaying flush up to " + this.blockingWaitTime + "ms");
456 if (!this.server.compactSplitThread.requestSplit(region)) {
457 try {
458 this.server.compactSplitThread.requestSystemCompaction(
459 region, Thread.currentThread().getName());
460 } catch (IOException e) {
461 LOG.error("Cache flush failed for region " +
462 Bytes.toStringBinary(region.getRegionInfo().getRegionName()),
463 RemoteExceptionHandler.checkIOException(e));
464 }
465 }
466 }
467
468
469
470 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
471
472 return true;
473 }
474 }
475 return flushRegion(region, false, fqe.isForceFlushAllStores());
476 }
477
478
479
480
481
482
483
484
485
486
487
488
489
490 private boolean flushRegion(final Region region, final boolean emergencyFlush,
491 boolean forceFlushAllStores) {
492 long startTime = 0;
493 synchronized (this.regionsInQueue) {
494 FlushRegionEntry fqe = this.regionsInQueue.remove(region);
495
496 if (fqe != null) {
497 startTime = fqe.createTime;
498 }
499 if (fqe != null && emergencyFlush) {
500
501
502 flushQueue.remove(fqe);
503 }
504 }
505 if (startTime == 0) {
506
507
508
509 startTime = EnvironmentEdgeManager.currentTime();
510 }
511 lock.readLock().lock();
512 try {
513 notifyFlushRequest(region, emergencyFlush);
514 FlushResult flushResult = region.flush(forceFlushAllStores);
515 boolean shouldCompact = flushResult.isCompactionNeeded();
516
517 boolean shouldSplit = ((HRegion)region).checkSplit() != null;
518 if (shouldSplit) {
519 this.server.compactSplitThread.requestSplit(region);
520 } else if (shouldCompact) {
521 server.compactSplitThread.requestSystemCompaction(
522 region, Thread.currentThread().getName());
523 }
524 if (flushResult.isFlushSucceeded()) {
525 long endTime = EnvironmentEdgeManager.currentTime();
526 server.metricsRegionServer.updateFlushTime(endTime - startTime);
527 }
528 } catch (DroppedSnapshotException ex) {
529
530
531
532
533
534 server.abort("Replay of WAL required. Forcing server shutdown", ex);
535 return false;
536 } catch (IOException ex) {
537 LOG.error("Cache flush failed" + (region != null ? (" for region " +
538 Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
539 RemoteExceptionHandler.checkIOException(ex));
540 if (!server.checkFileSystem()) {
541 return false;
542 }
543 } finally {
544 lock.readLock().unlock();
545 wakeUpIfBlocking();
546 }
547 return true;
548 }
549
550 private void notifyFlushRequest(Region region, boolean emergencyFlush) {
551 FlushType type = FlushType.NORMAL;
552 if (emergencyFlush) {
553 type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
554 }
555 for (FlushRequestListener listener : flushRequestListeners) {
556 listener.flushRequested(type, region);
557 }
558 }
559
560 private void wakeUpIfBlocking() {
561 synchronized (blockSignal) {
562 blockSignal.notifyAll();
563 }
564 }
565
566 private boolean isTooManyStoreFiles(Region region) {
567 for (Store store : region.getStores()) {
568 if (store.hasTooManyStoreFiles()) {
569 return true;
570 }
571 }
572 return false;
573 }
574
575
576
577
578
579
580
581 public void reclaimMemStoreMemory() {
582 TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
583 if (isAboveHighWaterMark()) {
584 if (Trace.isTracing()) {
585 scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
586 }
587 long start = EnvironmentEdgeManager.currentTime();
588 synchronized (this.blockSignal) {
589 boolean blocked = false;
590 long startTime = 0;
591 boolean interrupted = false;
592 try {
593 while (isAboveHighWaterMark() && !server.isStopped()) {
594 if (!blocked) {
595 startTime = EnvironmentEdgeManager.currentTime();
596 LOG.info("Blocking updates on "
597 + server.toString()
598 + ": the global memstore size "
599 + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
600 .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
601 + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
602 }
603 blocked = true;
604 wakeupFlushThread();
605 try {
606
607
608 blockSignal.wait(5 * 1000);
609 } catch (InterruptedException ie) {
610 LOG.warn("Interrupted while waiting");
611 interrupted = true;
612 }
613 long took = EnvironmentEdgeManager.currentTime() - start;
614 LOG.warn("Memstore is above high water mark and block " + took + "ms");
615 }
616 } finally {
617 if (interrupted) {
618 Thread.currentThread().interrupt();
619 }
620 }
621
622 if(blocked){
623 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
624 if(totalTime > 0){
625 this.updatesBlockedMsHighWater.add(totalTime);
626 }
627 LOG.info("Unblocking updates for server " + server.toString());
628 }
629 }
630 } else if (isAboveLowWaterMark()) {
631 wakeupFlushThread();
632 }
633 scope.close();
634 }
635 @Override
636 public String toString() {
637 return "flush_queue="
638 + flushQueue.size();
639 }
640
641 public String dumpQueue() {
642 StringBuilder queueList = new StringBuilder();
643 queueList.append("Flush Queue Queue dump:\n");
644 queueList.append(" Flush Queue:\n");
645 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
646
647 while(it.hasNext()){
648 queueList.append(" "+it.next().toString());
649 queueList.append("\n");
650 }
651
652 return queueList.toString();
653 }
654
655
656
657
658
659 @Override
660 public void registerFlushRequestListener(final FlushRequestListener listener) {
661 this.flushRequestListeners.add(listener);
662 }
663
664
665
666
667
668
669 @Override
670 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
671 return this.flushRequestListeners.remove(listener);
672 }
673
674
675
676
677
678 @Override
679 public void setGlobalMemstoreLimit(long globalMemStoreSize) {
680 this.globalMemStoreLimit = globalMemStoreSize;
681 this.globalMemStoreLimitLowMark =
682 (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
683 reclaimMemStoreMemory();
684 }
685
686 public long getMemoryLimit() {
687 return this.globalMemStoreLimit;
688 }
689
690 interface FlushQueueEntry extends Delayed {
691 }
692
693
694
695
696 static class WakeupFlushThread implements FlushQueueEntry {
697 @Override
698 public long getDelay(TimeUnit unit) {
699 return 0;
700 }
701
702 @Override
703 public int compareTo(Delayed o) {
704 return -1;
705 }
706
707 @Override
708 public boolean equals(Object obj) {
709 return (this == obj);
710 }
711 }
712
713
714
715
716
717
718
719
720
721 static class FlushRegionEntry implements FlushQueueEntry {
722 private final Region region;
723
724 private final long createTime;
725 private long whenToExpire;
726 private int requeueCount = 0;
727
728 private boolean forceFlushAllStores;
729
730 FlushRegionEntry(final Region r, boolean forceFlushAllStores) {
731 this.region = r;
732 this.createTime = EnvironmentEdgeManager.currentTime();
733 this.whenToExpire = this.createTime;
734 this.forceFlushAllStores = forceFlushAllStores;
735 }
736
737
738
739
740
741 public boolean isMaximumWait(final long maximumWait) {
742 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
743 }
744
745
746
747
748
749 public int getRequeueCount() {
750 return this.requeueCount;
751 }
752
753
754
755
756 public boolean isForceFlushAllStores() {
757 return forceFlushAllStores;
758 }
759
760
761
762
763
764
765
766 public FlushRegionEntry requeue(final long when) {
767 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
768 this.requeueCount++;
769 return this;
770 }
771
772 @Override
773 public long getDelay(TimeUnit unit) {
774 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
775 TimeUnit.MILLISECONDS);
776 }
777
778 @Override
779 public int compareTo(Delayed other) {
780
781 int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
782 other.getDelay(TimeUnit.MILLISECONDS)).intValue();
783 if (ret != 0) {
784 return ret;
785 }
786 FlushQueueEntry otherEntry = (FlushQueueEntry) other;
787 return hashCode() - otherEntry.hashCode();
788 }
789
790 @Override
791 public String toString() {
792 return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]";
793 }
794
795 @Override
796 public int hashCode() {
797 int hash = (int) getDelay(TimeUnit.MILLISECONDS);
798 return hash ^ region.hashCode();
799 }
800
801 @Override
802 public boolean equals(Object obj) {
803 if (this == obj) {
804 return true;
805 }
806 if (obj == null || getClass() != obj.getClass()) {
807 return false;
808 }
809 Delayed other = (Delayed) obj;
810 return compareTo(other) == 0;
811 }
812 }
813 }