View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.concurrent.Executors;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.PriorityBlockingQueue;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.ThreadFactory;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
40  
41  import com.google.common.base.Preconditions;
42  
43  /**
44   * Compact region on request and then run split if appropriate
45   */
46  public class CompactSplitThread implements CompactionRequestor {
47    static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
48  
49    private final HRegionServer server;
50    private final Configuration conf;
51  
52    private final ThreadPoolExecutor largeCompactions;
53    private final ThreadPoolExecutor smallCompactions;
54    private final ThreadPoolExecutor splits;
55  
56    /**
57     * Splitting should not take place if the total number of regions exceed this.
58     * This is not a hard limit to the number of regions but it is a guideline to
59     * stop splitting after number of online regions is greater than this.
60     */
61    private int regionSplitLimit;
62  
63    /** @param server */
64    CompactSplitThread(HRegionServer server) {
65      super();
66      this.server = server;
67      this.conf = server.getConfiguration();
68      this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit",
69          Integer.MAX_VALUE);
70  
71      int largeThreads = Math.max(1, conf.getInt(
72          "hbase.regionserver.thread.compaction.large", 1));
73      int smallThreads = conf.getInt(
74          "hbase.regionserver.thread.compaction.small", 1);
75  
76      int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
77  
78      // if we have throttle threads, make sure the user also specified size
79      Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
80  
81      final String n = Thread.currentThread().getName();
82  
83      this.largeCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
84          60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
85          new ThreadFactory() {
86            @Override
87            public Thread newThread(Runnable r) {
88              Thread t = new Thread(r);
89              t.setName(n + "-largeCompactions-" + System.currentTimeMillis());
90              return t;
91            }
92        });
93      this.largeCompactions
94          .setRejectedExecutionHandler(new CompactionRequest.Rejection());
95      this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
96          60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
97          new ThreadFactory() {
98            @Override
99            public Thread newThread(Runnable r) {
100             Thread t = new Thread(r);
101             t.setName(n + "-smallCompactions-" + System.currentTimeMillis());
102             return t;
103           }
104       });
105     this.smallCompactions
106         .setRejectedExecutionHandler(new CompactionRequest.Rejection());
107     this.splits = (ThreadPoolExecutor)
108         Executors.newFixedThreadPool(splitThreads,
109             new ThreadFactory() {
110           @Override
111           public Thread newThread(Runnable r) {
112             Thread t = new Thread(r);
113             t.setName(n + "-splits-" + System.currentTimeMillis());
114             return t;
115           }
116       });
117   }
118 
119   @Override
120   public String toString() {
121     return "compaction_queue=("
122         + largeCompactions.getQueue().size() + ":"
123         + smallCompactions.getQueue().size() + ")"
124         + ", split_queue=" + splits.getQueue().size();
125   }
126 
127   public String dumpQueue() {
128     StringBuffer queueLists = new StringBuffer();
129     queueLists.append("Compaction/Split Queue dump:\n");
130     queueLists.append("  LargeCompation Queue:\n");
131     BlockingQueue<Runnable> lq = largeCompactions.getQueue();
132     Iterator it = lq.iterator();
133     while(it.hasNext()){
134       queueLists.append("    "+it.next().toString());
135       queueLists.append("\n");
136     }
137     
138     if( smallCompactions != null ){
139       queueLists.append("\n");
140       queueLists.append("  SmallCompation Queue:\n");
141       lq = smallCompactions.getQueue();
142       it = lq.iterator();
143       while(it.hasNext()){
144         queueLists.append("    "+it.next().toString());
145         queueLists.append("\n");
146       }
147     }
148     
149     queueLists.append("\n");
150     queueLists.append("  Split Queue:\n");
151     lq = splits.getQueue();
152     it = lq.iterator();
153     while(it.hasNext()){
154       queueLists.append("    "+it.next().toString());
155       queueLists.append("\n");
156     }
157     
158     return queueLists.toString();
159   }
160 
161   public synchronized boolean requestSplit(final HRegion r) {
162     // don't split regions that are blocking
163     if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
164       byte[] midKey = r.checkSplit();
165       if (midKey != null) {
166         requestSplit(r, midKey);
167         return true;
168       }
169     }
170     return false;
171   }
172 
173   public synchronized void requestSplit(final HRegion r, byte[] midKey) {
174     if (midKey == null) {
175       LOG.debug("Region " + r.getRegionNameAsString() +
176         " not splittable because midkey=null");
177       return;
178     }
179     try {
180       this.splits.execute(new SplitRequest(r, midKey, this.server));
181       if (LOG.isDebugEnabled()) {
182         LOG.debug("Split requested for " + r + ".  " + this);
183       }
184     } catch (RejectedExecutionException ree) {
185       LOG.info("Could not execute split for " + r, ree);
186     }
187   }
188 
189   @Override
190   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why)
191       throws IOException {
192     return requestCompaction(r, why, null);
193   }
194 
195   @Override
196   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
197       List<CompactionRequest> requests) throws IOException {
198     return requestCompaction(r, why, Store.NO_PRIORITY, requests);
199   }
200 
201   @Override
202   public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
203       final String why,
204       CompactionRequest request) throws IOException {
205     return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
206   }
207 
208   @Override
209   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
210       int pri, final List<CompactionRequest> requests) throws IOException {
211     List<CompactionRequest> ret;
212     // not a special compaction request, so make out own list
213     if (requests == null) {
214       ret = new ArrayList<CompactionRequest>(r.getStores().size());
215       for (Store s : r.getStores().values()) {
216         ret.add(requestCompaction(r, s, why, pri, null));
217       }
218     } else {
219       ret = new ArrayList<CompactionRequest>(requests.size());
220       for (CompactionRequest request : requests) {
221         ret.add(requestCompaction(r, request.getStore(), why, pri, request));
222       }
223     }
224     return ret;
225   }
226 
227   @Override
228   public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
229       final String why, int priority, CompactionRequest request) throws IOException {
230     if (this.server.isStopped()) {
231       return null;
232     }
233     CompactionRequest cr = s.requestCompaction(priority, request);
234     if (cr != null) {
235       cr.setServer(server);
236       if (priority != Store.NO_PRIORITY) {
237         cr.setPriority(priority);
238       }
239       ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
240           ? largeCompactions : smallCompactions;
241       pool.execute(cr);
242       if (LOG.isDebugEnabled()) {
243         String type = (pool == smallCompactions) ? "Small " : "Large ";
244         LOG.debug(type + "Compaction requested: " + cr
245             + (why != null && !why.isEmpty() ? "; Because: " + why : "")
246             + "; " + this);
247       }
248     } else {
249       if(LOG.isDebugEnabled()) {
250         LOG.debug("Not compacting " + r.getRegionNameAsString() + 
251             " because compaction request was cancelled");
252       }
253     }
254     return cr;
255   }
256 
257   /**
258    * Only interrupt once it's done with a run through the work loop.
259    */
260   void interruptIfNecessary() {
261     splits.shutdown();
262     largeCompactions.shutdown();
263     smallCompactions.shutdown();
264   }
265 
266   private void waitFor(ThreadPoolExecutor t, String name) {
267     boolean done = false;
268     while (!done) {
269       try {
270         done = t.awaitTermination(60, TimeUnit.SECONDS);
271         LOG.debug("Waiting for " + name + " to finish...");
272         if (!done) {
273           t.shutdownNow();
274         }
275       } catch (InterruptedException ie) {
276         LOG.debug("Interrupted waiting for " + name + " to finish...");
277       }
278     }
279   }
280 
281   void join() {
282     waitFor(splits, "Split Thread");
283     waitFor(largeCompactions, "Large Compaction Thread");
284     waitFor(smallCompactions, "Small Compaction Thread");
285   }
286 
287   /**
288    * Returns the current size of the queue containing regions that are
289    * processed.
290    *
291    * @return The current size of the regions queue.
292    */
293   public int getCompactionQueueSize() {
294     return largeCompactions.getQueue().size() + smallCompactions.getQueue().size();
295   }
296 
297   private boolean shouldSplitRegion() {
298     return (regionSplitLimit > server.getNumberOfOnlineRegions());
299   }
300 
301   /**
302    * @return the regionSplitLimit
303    */
304   public int getRegionSplitLimit() {
305     return this.regionSplitLimit;
306   }
307 }