1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.util;
19
20 import java.util.Collection;
21 import java.util.concurrent.ConcurrentLinkedQueue;
22 import java.util.concurrent.atomic.AtomicLong;
23
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.classification.InterfaceStability;
26
27
28
29
30 @InterfaceAudience.Private
31 @InterfaceStability.Stable
32 public class BoundedConcurrentLinkedQueue<T> extends ConcurrentLinkedQueue<T> {
33 private static final long serialVersionUID = 1L;
34 private final AtomicLong size = new AtomicLong(0L);
35 private final long maxSize;
36
37 public BoundedConcurrentLinkedQueue() {
38 this(Long.MAX_VALUE);
39 }
40
41 public BoundedConcurrentLinkedQueue(long maxSize) {
42 super();
43 this.maxSize = maxSize;
44 }
45
46 @Override
47 public boolean addAll(Collection<? extends T> c) {
48 for (;;) {
49 long currentSize = size.get();
50 long nextSize = currentSize + c.size();
51 if (nextSize > maxSize) {
52 return false;
53 }
54 if (size.compareAndSet(currentSize, nextSize)) {
55 break;
56 }
57 }
58 return super.addAll(c);
59 }
60
61 @Override
62 public void clear() {
63
64 long removed = 0L;
65 while (super.poll() != null) {
66 removed++;
67 }
68 size.addAndGet(-removed);
69 }
70
71 @Override
72 public boolean offer(T e) {
73 for (;;) {
74 long currentSize = size.get();
75 if (currentSize >= maxSize) {
76 return false;
77 }
78 if (size.compareAndSet(currentSize, currentSize + 1)) {
79 break;
80 }
81 }
82 return super.offer(e);
83 }
84
85 @Override
86 public T poll() {
87 T result = super.poll();
88 if (result != null) {
89 size.decrementAndGet();
90 }
91 return result;
92 }
93
94 @Override
95 public boolean remove(Object o) {
96 boolean result = super.remove(o);
97 if (result) {
98 size.decrementAndGet();
99 }
100 return result;
101 }
102
103 @Override
104 public int size() {
105 return (int) size.get();
106 }
107
108 public void drainTo(Collection<T> list) {
109 long removed = 0;
110 for (T element; (element = super.poll()) != null;) {
111 list.add(element);
112 removed++;
113 }
114
115
116 size.addAndGet(-removed);
117 }
118
119 public long remainingCapacity() {
120 return maxSize - size.get();
121 }
122 }