View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.PrintWriter;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.PriorityBlockingQueue;
30  import java.util.concurrent.RejectedExecutionException;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.classification.InterfaceAudience;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.RemoteExceptionHandler;
41  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
42  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
43  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
44  import org.apache.hadoop.hbase.util.Pair;
45  import org.apache.hadoop.util.StringUtils;
46  
47  import com.google.common.base.Preconditions;
48  
49  /**
50   * Compact region on request and then run split if appropriate
51   */
52  @InterfaceAudience.Private
53  public class CompactSplitThread implements CompactionRequestor {
54    static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
55  
56    private final HRegionServer server;
57    private final Configuration conf;
58  
59    private final ThreadPoolExecutor longCompactions;
60    private final ThreadPoolExecutor shortCompactions;
61    private final ThreadPoolExecutor splits;
62    private final ThreadPoolExecutor mergePool;
63  
64    /**
65     * Splitting should not take place if the total number of regions exceed this.
66     * This is not a hard limit to the number of regions but it is a guideline to
67     * stop splitting after number of online regions is greater than this.
68     */
69    private int regionSplitLimit;
70  
71    /** @param server */
72    CompactSplitThread(HRegionServer server) {
73      super();
74      this.server = server;
75      this.conf = server.getConfiguration();
76      this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit",
77          Integer.MAX_VALUE);
78  
79      int largeThreads = Math.max(1, conf.getInt(
80          "hbase.regionserver.thread.compaction.large", 1));
81      int smallThreads = conf.getInt(
82          "hbase.regionserver.thread.compaction.small", 1);
83  
84      int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
85  
86      // if we have throttle threads, make sure the user also specified size
87      Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
88  
89      final String n = Thread.currentThread().getName();
90  
91      this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
92          60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
93          new ThreadFactory() {
94            @Override
95            public Thread newThread(Runnable r) {
96              Thread t = new Thread(r);
97              t.setName(n + "-longCompactions-" + System.currentTimeMillis());
98              return t;
99            }
100       });
101     this.longCompactions.setRejectedExecutionHandler(new Rejection());
102     this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
103         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
104         new ThreadFactory() {
105           @Override
106           public Thread newThread(Runnable r) {
107             Thread t = new Thread(r);
108             t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
109             return t;
110           }
111       });
112     this.shortCompactions
113         .setRejectedExecutionHandler(new Rejection());
114     this.splits = (ThreadPoolExecutor)
115         Executors.newFixedThreadPool(splitThreads,
116             new ThreadFactory() {
117           @Override
118           public Thread newThread(Runnable r) {
119             Thread t = new Thread(r);
120             t.setName(n + "-splits-" + System.currentTimeMillis());
121             return t;
122           }
123       });
124     int mergeThreads = conf.getInt("hbase.regionserver.thread.merge", 1);
125     this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
126         mergeThreads, new ThreadFactory() {
127           @Override
128           public Thread newThread(Runnable r) {
129             Thread t = new Thread(r);
130             t.setName(n + "-merges-" + System.currentTimeMillis());
131             return t;
132           }
133         });
134   }
135 
136   @Override
137   public String toString() {
138     return "compaction_queue=("
139         + longCompactions.getQueue().size() + ":"
140         + shortCompactions.getQueue().size() + ")"
141         + ", split_queue=" + splits.getQueue().size()
142         + ", merge_queue=" + mergePool.getQueue().size();
143   }
144   
145   public String dumpQueue() {
146     StringBuffer queueLists = new StringBuffer();
147     queueLists.append("Compaction/Split Queue dump:\n");
148     queueLists.append("  LargeCompation Queue:\n");
149     BlockingQueue<Runnable> lq = longCompactions.getQueue();
150     Iterator it = lq.iterator();
151     while(it.hasNext()){
152       queueLists.append("    "+it.next().toString());
153       queueLists.append("\n");
154     }
155 
156     if( shortCompactions != null ){
157       queueLists.append("\n");
158       queueLists.append("  SmallCompation Queue:\n");
159       lq = shortCompactions.getQueue();
160       it = lq.iterator();
161       while(it.hasNext()){
162         queueLists.append("    "+it.next().toString());
163         queueLists.append("\n");
164       }
165     }
166     
167     queueLists.append("\n");
168     queueLists.append("  Split Queue:\n");
169     lq = splits.getQueue();
170     it = lq.iterator();
171     while(it.hasNext()){
172       queueLists.append("    "+it.next().toString());
173       queueLists.append("\n");
174     }
175     
176     queueLists.append("\n");
177     queueLists.append("  Region Merge Queue:\n");
178     lq = mergePool.getQueue();
179     it = lq.iterator();
180     while (it.hasNext()) {
181       queueLists.append("    " + it.next().toString());
182       queueLists.append("\n");
183     }
184 
185     return queueLists.toString();
186   }
187 
188   public synchronized void requestRegionsMerge(final HRegion a,
189       final HRegion b, final boolean forcible) {
190     try {
191       mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible));
192       if (LOG.isDebugEnabled()) {
193         LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
194             + forcible + ".  " + this);
195       }
196     } catch (RejectedExecutionException ree) {
197       LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
198           + forcible, ree);
199     }
200   }
201 
202   public synchronized boolean requestSplit(final HRegion r) {
203     // don't split regions that are blocking
204     if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
205       byte[] midKey = r.checkSplit();
206       if (midKey != null) {
207         requestSplit(r, midKey);
208         return true;
209       }
210     }
211     return false;
212   }
213 
214   public synchronized void requestSplit(final HRegion r, byte[] midKey) {
215     if (midKey == null) {
216       LOG.debug("Region " + r.getRegionNameAsString() +
217         " not splittable because midkey=null");
218       return;
219     }
220     try {
221       this.splits.execute(new SplitRequest(r, midKey, this.server));
222       if (LOG.isDebugEnabled()) {
223         LOG.debug("Split requested for " + r + ".  " + this);
224       }
225     } catch (RejectedExecutionException ree) {
226       LOG.info("Could not execute split for " + r, ree);
227     }
228   }
229 
230   @Override
231   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why)
232       throws IOException {
233     return requestCompaction(r, why, null);
234   }
235 
236   @Override
237   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
238       List<Pair<CompactionRequest, Store>> requests) throws IOException {
239     return requestCompaction(r, why, Store.NO_PRIORITY, requests);
240   }
241 
242   @Override
243   public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
244       final String why, CompactionRequest request) throws IOException {
245     return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
246   }
247 
248   @Override
249   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
250       int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
251     return requestCompactionInternal(r, why, p, requests, true);
252   }
253 
254   private List<CompactionRequest> requestCompactionInternal(final HRegion r, final String why,
255       int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow) throws IOException {
256     // not a special compaction request, so make our own list
257     List<CompactionRequest> ret = null;
258     if (requests == null) {
259       ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
260       for (Store s : r.getStores().values()) {
261         CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
262         if (selectNow) ret.add(cr);
263       }
264     } else {
265       Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
266       ret = new ArrayList<CompactionRequest>(requests.size());
267       for (Pair<CompactionRequest, Store> pair : requests) {
268         ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
269       }
270     }
271     return ret;
272   }
273 
274   public CompactionRequest requestCompaction(final HRegion r, final Store s,
275       final String why, int priority, CompactionRequest request) throws IOException {
276     return requestCompactionInternal(r, s, why, priority, request, true);
277   }
278 
279   public synchronized void requestSystemCompaction(
280       final HRegion r, final String why) throws IOException {
281     requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
282   }
283 
284   public void requestSystemCompaction(
285       final HRegion r, final Store s, final String why) throws IOException {
286     requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
287   }
288 
289   /**
290    * @param r HRegion store belongs to
291    * @param s Store to request compaction on
292    * @param why Why compaction requested -- used in debug messages
293    * @param priority override the default priority (NO_PRIORITY == decide)
294    * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
295    *          compaction will be used.
296    */
297   private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
298       final String why, int priority, CompactionRequest request, boolean selectNow)
299           throws IOException {
300     if (this.server.isStopped()
301         || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
302       return null;
303     }
304 
305     CompactionContext compaction = null;
306     if (selectNow) {
307       compaction = selectCompaction(r, s, priority, request);
308       if (compaction == null) return null; // message logged inside
309     }
310 
311     // We assume that most compactions are small. So, put system compactions into small
312     // pool; we will do selection there, and move to large pool if necessary.
313     long size = selectNow ? compaction.getRequest().getSize() : 0;
314     ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
315       ? longCompactions : shortCompactions;
316     pool.execute(new CompactionRunner(s, r, compaction, pool));
317     if (LOG.isDebugEnabled()) {
318       String type = (pool == shortCompactions) ? "Small " : "Large ";
319       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
320           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
321     }
322     return selectNow ? compaction.getRequest() : null;
323   }
324 
325   private CompactionContext selectCompaction(final HRegion r, final Store s,
326       int priority, CompactionRequest request) throws IOException {
327     CompactionContext compaction = s.requestCompaction(priority, request);
328     if (compaction == null) {
329       if(LOG.isDebugEnabled()) {
330         LOG.debug("Not compacting " + r.getRegionNameAsString() +
331             " because compaction request was cancelled");
332       }
333       return null;
334     }
335     assert compaction.hasSelection();
336     if (priority != Store.NO_PRIORITY) {
337       compaction.getRequest().setPriority(priority);
338     }
339     return compaction;
340   }
341 
342   /**
343    * Only interrupt once it's done with a run through the work loop.
344    */
345   void interruptIfNecessary() {
346     splits.shutdown();
347     mergePool.shutdown();
348     longCompactions.shutdown();
349     shortCompactions.shutdown();
350   }
351 
352   private void waitFor(ThreadPoolExecutor t, String name) {
353     boolean done = false;
354     while (!done) {
355       try {
356         done = t.awaitTermination(60, TimeUnit.SECONDS);
357         LOG.info("Waiting for " + name + " to finish...");
358         if (!done) {
359           t.shutdownNow();
360         }
361       } catch (InterruptedException ie) {
362         LOG.warn("Interrupted waiting for " + name + " to finish...");
363       }
364     }
365   }
366 
367   void join() {
368     waitFor(splits, "Split Thread");
369     waitFor(mergePool, "Merge Thread");
370     waitFor(longCompactions, "Large Compaction Thread");
371     waitFor(shortCompactions, "Small Compaction Thread");
372   }
373 
374   /**
375    * Returns the current size of the queue containing regions that are
376    * processed.
377    *
378    * @return The current size of the regions queue.
379    */
380   public int getCompactionQueueSize() {
381     return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
382   }
383 
384   public int getLargeCompactionQueueSize() {
385     return longCompactions.getQueue().size();
386   }
387 
388 
389   public int getSmallCompactionQueueSize() {
390     return shortCompactions.getQueue().size();
391   }
392 
393 
394   private boolean shouldSplitRegion() {
395     return (regionSplitLimit > server.getNumberOfOnlineRegions());
396   }
397 
398   /**
399    * @return the regionSplitLimit
400    */
401   public int getRegionSplitLimit() {
402     return this.regionSplitLimit;
403   }
404 
405   private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
406     private final Store store;
407     private final HRegion region;
408     private CompactionContext compaction;
409     private int queuedPriority;
410     private ThreadPoolExecutor parent;
411 
412     public CompactionRunner(Store store, HRegion region,
413         CompactionContext compaction, ThreadPoolExecutor parent) {
414       super();
415       this.store = store;
416       this.region = region;
417       this.compaction = compaction;
418       this.queuedPriority = (this.compaction == null)
419           ? store.getCompactPriority() : compaction.getRequest().getPriority();
420       this.parent = parent;
421     }
422 
423     @Override
424     public String toString() {
425       return (this.compaction != null) ? ("Request = " + compaction.getRequest())
426           : ("Store = " + store.toString() + ", pri = " + queuedPriority);
427     }
428 
429     @Override
430     public void run() {
431       Preconditions.checkNotNull(server);
432       if (server.isStopped()
433           || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
434         return;
435       }
436       // Common case - system compaction without a file selection. Select now.
437       if (this.compaction == null) {
438         int oldPriority = this.queuedPriority;
439         this.queuedPriority = this.store.getCompactPriority();
440         if (this.queuedPriority > oldPriority) {
441           // Store priority decreased while we were in queue (due to some other compaction?),
442           // requeue with new priority to avoid blocking potential higher priorities.
443           this.parent.execute(this);
444           return;
445         }
446         try {
447           this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
448         } catch (IOException ex) {
449           LOG.error("Compaction selection failed " + this, ex);
450           server.checkFileSystem();
451           return;
452         }
453         if (this.compaction == null) return; // nothing to do
454         // Now see if we are in correct pool for the size; if not, go to the correct one.
455         // We might end up waiting for a while, so cancel the selection.
456         assert this.compaction.hasSelection();
457         ThreadPoolExecutor pool = store.throttleCompaction(
458             compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
459         if (this.parent != pool) {
460           this.store.cancelRequestedCompaction(this.compaction);
461           this.compaction = null;
462           this.parent = pool;
463           this.parent.execute(this);
464           return;
465         }
466       }
467       // Finally we can compact something.
468       assert this.compaction != null;
469 
470       this.compaction.getRequest().beforeExecute();
471       try {
472         // Note: please don't put single-compaction logic here;
473         //       put it into region/store/etc. This is CST logic.
474         long start = EnvironmentEdgeManager.currentTimeMillis();
475         boolean completed = region.compact(compaction, store);
476         long now = EnvironmentEdgeManager.currentTimeMillis();
477         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
478               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
479         if (completed) {
480           // degenerate case: blocked regions require recursive enqueues
481           if (store.getCompactPriority() <= 0) {
482             requestSystemCompaction(region, store, "Recursive enqueue");
483           } else {
484             // see if the compaction has caused us to exceed max region size
485             requestSplit(region);
486           }
487         }
488       } catch (IOException ex) {
489         IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
490         LOG.error("Compaction failed " + this, remoteEx);
491         if (remoteEx != ex) {
492           LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
493         }
494         server.checkFileSystem();
495       } catch (Exception ex) {
496         LOG.error("Compaction failed " + this, ex);
497         server.checkFileSystem();
498       } finally {
499         LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
500       }
501       this.compaction.getRequest().afterExecute();
502     }
503 
504     private String formatStackTrace(Exception ex) {
505       StringWriter sw = new StringWriter();
506       PrintWriter pw = new PrintWriter(sw);
507       ex.printStackTrace(pw);
508       pw.flush();
509       return sw.toString();
510     }
511 
512     @Override
513     public int compareTo(CompactionRunner o) {
514       // Only compare the underlying request (if any), for queue sorting purposes.
515       int compareVal = queuedPriority - o.queuedPriority; // compare priority
516       if (compareVal != 0) return compareVal;
517       CompactionContext tc = this.compaction, oc = o.compaction;
518       // Sort pre-selected (user?) compactions before system ones with equal priority.
519       return (tc == null) ? ((oc == null) ? 0 : 1)
520           : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
521     }
522   }
523 
524   /**
525    * Cleanup class to use when rejecting a compaction request from the queue.
526    */
527   private static class Rejection implements RejectedExecutionHandler {
528     @Override
529     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
530       if (runnable instanceof CompactionRunner) {
531         CompactionRunner runner = (CompactionRunner)runnable;
532         LOG.debug("Compaction Rejected: " + runner);
533         runner.store.cancelRequestedCompaction(runner.compaction);
534       }
535     }
536   }
537 }