View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.util.Map;
22  
23  import java.util.concurrent.locks.ReentrantLock;
24  import java.util.concurrent.ConcurrentSkipListMap;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  
29  /**
30   * This class is a container of queues that allows to select a queue
31   * in a round robin fashion, considering priority of the queue.
32   *
33   * the quantum is just how many poll() will return the same object.
34   * e.g. if quantum is 1 and you have A and B as object you'll get: A B A B
35   * e.g. if quantum is 2 and you have A and B as object you'll get: A A B B A A B B
36   * then the object priority is just a priority * quantum
37   *
38   * Example:
39   *  - three queues (A, B, C) with priorities (1, 1, 2)
40   *  - The first poll() will return A
41   *  - The second poll() will return B
42   *  - The third and forth poll() will return C
43   *  - and so on again and again.
44   */
45  @InterfaceAudience.Private
46  @InterfaceStability.Evolving
47  public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues.FairObject> {
48    private ConcurrentSkipListMap<TKey, TQueue> objMap =
49      new ConcurrentSkipListMap<TKey, TQueue>();
50  
51    private final ReentrantLock lock = new ReentrantLock();
52    private final int quantum;
53  
54    private Map.Entry<TKey, TQueue> current = null;
55    private int currentQuantum = 0;
56  
57    public interface FairObject {
58      boolean isAvailable();
59      int getPriority();
60    }
61  
62    /**
63     * @param quantum how many poll() will return the same object.
64     */
65    public ProcedureFairRunQueues(final int quantum) {
66      this.quantum = quantum;
67    }
68  
69    public TQueue get(final TKey key) {
70      return objMap.get(key);
71    }
72  
73    public TQueue add(final TKey key, final TQueue queue) {
74      TQueue oldq = objMap.putIfAbsent(key, queue);
75      return oldq != null ? oldq : queue;
76    }
77  
78    public TQueue remove(final TKey key) {
79      TQueue queue = objMap.get(key);
80      if (queue != null) {
81        lock.lock();
82        try {
83          queue = objMap.remove(key);
84          if (current != null && queue == current.getValue()) {
85            currentQuantum = 0;
86            current = null;
87          }
88        } finally {
89          lock.unlock();
90        }
91      }
92      return queue;
93    }
94  
95    public void clear() {
96      lock.lock();
97      try {
98        currentQuantum = 0;
99        current = null;
100       objMap.clear();
101     } finally {
102       lock.unlock();
103     }
104   }
105 
106   /**
107    * @return the next available item if present
108    */
109   public TQueue poll() {
110     lock.lock();
111     try {
112       TQueue queue;
113       if (currentQuantum == 0) {
114         if (nextObject() == null) {
115           // nothing here
116           return null;
117         }
118 
119         queue = current.getValue();
120         currentQuantum = calculateQuantum(queue) - 1;
121       } else {
122         currentQuantum--;
123         queue = current.getValue();
124       }
125 
126       if (!queue.isAvailable()) {
127         Map.Entry<TKey, TQueue> last = current;
128         // Try the next one
129         do {
130           if (nextObject() == null)
131             return null;
132         } while (current.getValue() != last.getValue() && !current.getValue().isAvailable());
133 
134         queue = current.getValue();
135         currentQuantum = calculateQuantum(queue) - 1;
136       }
137 
138       return queue;
139     } finally {
140       lock.unlock();
141     }
142   }
143 
144   @Override
145   public String toString() {
146     StringBuilder builder = new StringBuilder();
147     builder.append('{');
148     for (Map.Entry<TKey, TQueue> entry: objMap.entrySet()) {
149       builder.append(entry.getKey());
150       builder.append(':');
151       builder.append(entry.getValue());
152     }
153     builder.append('}');
154     return builder.toString();
155   }
156 
157   private Map.Entry<TKey, TQueue> nextObject() {
158     Map.Entry<TKey, TQueue> next = null;
159 
160     // If we have already a key, try the next one
161     if (current != null) {
162       next = objMap.higherEntry(current.getKey());
163     }
164 
165     // if there is no higher key, go back to the first
166     current = (next != null) ? next : objMap.firstEntry();
167     return current;
168   }
169 
170   private int calculateQuantum(final TQueue fairObject) {
171     // TODO
172     return Math.max(1, fairObject.getPriority() * quantum);
173   }
174 }