View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util;
19  
20  import java.util.Collection;
21  import java.util.concurrent.ConcurrentLinkedQueue;
22  import java.util.concurrent.atomic.AtomicLong;
23  
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.classification.InterfaceStability;
26  
27  /**
28   * A ConcurrentLinkedQueue that enforces a maximum queue size.
29   */
30  @InterfaceAudience.Private
31  @InterfaceStability.Stable
32  public class BoundedConcurrentLinkedQueue<T> extends ConcurrentLinkedQueue<T> {
33    private static final long serialVersionUID = 1L;
34    private final AtomicLong size = new AtomicLong(0L);
35    private final long maxSize;
36  
37    public BoundedConcurrentLinkedQueue() {
38      this(Long.MAX_VALUE);
39    }
40  
41    public BoundedConcurrentLinkedQueue(long maxSize) {
42      super();
43      this.maxSize = maxSize;
44    }
45  
46    @Override
47    public boolean addAll(Collection<? extends T> c) {
48      for (;;) {
49        long currentSize = size.get();
50        long nextSize = currentSize + c.size();
51        if (nextSize > maxSize) { // already exceeded limit
52          return false;
53        }
54        if (size.compareAndSet(currentSize, nextSize)) {
55          break;
56        }
57      }
58      return super.addAll(c); // Always true for ConcurrentLinkedQueue
59    }
60  
61    @Override
62    public void clear() {
63      // override this method to batch update size.
64      long removed = 0L;
65      while (super.poll() != null) {
66        removed++;
67      }
68      size.addAndGet(-removed);
69    }
70  
71    @Override
72    public boolean offer(T e) {
73      for (;;) {
74        long currentSize = size.get();
75        if (currentSize >= maxSize) { // already exceeded limit
76          return false;
77        }
78        if (size.compareAndSet(currentSize, currentSize + 1)) {
79          break;
80        }
81      }
82      return super.offer(e); // Always true for ConcurrentLinkedQueue
83    }
84  
85    @Override
86    public T poll() {
87      T result = super.poll();
88      if (result != null) {
89        size.decrementAndGet();
90      }
91      return result;
92    }
93  
94    @Override
95    public boolean remove(Object o) {
96      boolean result = super.remove(o);
97      if (result) {
98        size.decrementAndGet();
99      }
100     return result;
101   }
102 
103   @Override
104   public int size() {
105     return (int) size.get();
106   }
107 
108   public void drainTo(Collection<T> list) {
109     long removed = 0;
110     for (T element; (element = super.poll()) != null;) {
111       list.add(element);
112       removed++;
113     }
114     // Limit the number of operations on size by only reporting size change after the drain is
115     // completed.
116     size.addAndGet(-removed);
117   }
118 
119   public long remainingCapacity() {
120     return maxSize - size.get();
121   }
122 }