View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.PrintWriter;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.PriorityBlockingQueue;
30  import java.util.concurrent.RejectedExecutionException;
31  import java.util.concurrent.RejectedExecutionHandler;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.classification.InterfaceAudience;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
41  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
42  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
43  import org.apache.hadoop.hbase.util.Pair;
44  import org.apache.hadoop.ipc.RemoteException;
45  import org.apache.hadoop.util.StringUtils;
46  
47  import com.google.common.base.Preconditions;
48  
49  /**
50   * Compact region on request and then run split if appropriate
51   */
52  @InterfaceAudience.Private
53  public class CompactSplitThread implements CompactionRequestor {
54    static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
55  
56    private final HRegionServer server;
57    private final Configuration conf;
58  
59    private final ThreadPoolExecutor longCompactions;
60    private final ThreadPoolExecutor shortCompactions;
61    private final ThreadPoolExecutor splits;
62    private final ThreadPoolExecutor mergePool;
63  
64    /**
65     * Splitting should not take place if the total number of regions exceed this.
66     * This is not a hard limit to the number of regions but it is a guideline to
67     * stop splitting after number of online regions is greater than this.
68     */
69    private int regionSplitLimit;
70  
71    /** @param server */
72    CompactSplitThread(HRegionServer server) {
73      super();
74      this.server = server;
75      this.conf = server.getConfiguration();
76      this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit",
77          Integer.MAX_VALUE);
78  
79      int largeThreads = Math.max(1, conf.getInt(
80          "hbase.regionserver.thread.compaction.large", 1));
81      int smallThreads = conf.getInt(
82          "hbase.regionserver.thread.compaction.small", 1);
83  
84      int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
85  
86      // if we have throttle threads, make sure the user also specified size
87      Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
88  
89      final String n = Thread.currentThread().getName();
90  
91      this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
92          60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
93          new ThreadFactory() {
94            @Override
95            public Thread newThread(Runnable r) {
96              Thread t = new Thread(r);
97              t.setName(n + "-longCompactions-" + System.currentTimeMillis());
98              return t;
99            }
100       });
101     this.longCompactions.setRejectedExecutionHandler(new Rejection());
102     this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
103         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
104         new ThreadFactory() {
105           @Override
106           public Thread newThread(Runnable r) {
107             Thread t = new Thread(r);
108             t.setName(n + "-shortCompactions-" + System.currentTimeMillis());
109             return t;
110           }
111       });
112     this.shortCompactions
113         .setRejectedExecutionHandler(new Rejection());
114     this.splits = (ThreadPoolExecutor)
115         Executors.newFixedThreadPool(splitThreads,
116             new ThreadFactory() {
117           @Override
118           public Thread newThread(Runnable r) {
119             Thread t = new Thread(r);
120             t.setName(n + "-splits-" + System.currentTimeMillis());
121             return t;
122           }
123       });
124     int mergeThreads = conf.getInt("hbase.regionserver.thread.merge", 1);
125     this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
126         mergeThreads, new ThreadFactory() {
127           @Override
128           public Thread newThread(Runnable r) {
129             Thread t = new Thread(r);
130             t.setName(n + "-merges-" + System.currentTimeMillis());
131             return t;
132           }
133         });
134   }
135 
136   @Override
137   public String toString() {
138     return "compaction_queue=("
139         + longCompactions.getQueue().size() + ":"
140         + shortCompactions.getQueue().size() + ")"
141         + ", split_queue=" + splits.getQueue().size()
142         + ", merge_queue=" + mergePool.getQueue().size();
143   }
144   
145   public String dumpQueue() {
146     StringBuffer queueLists = new StringBuffer();
147     queueLists.append("Compaction/Split Queue dump:\n");
148     queueLists.append("  LargeCompation Queue:\n");
149     BlockingQueue<Runnable> lq = longCompactions.getQueue();
150     Iterator it = lq.iterator();
151     while(it.hasNext()){
152       queueLists.append("    "+it.next().toString());
153       queueLists.append("\n");
154     }
155 
156     if( shortCompactions != null ){
157       queueLists.append("\n");
158       queueLists.append("  SmallCompation Queue:\n");
159       lq = shortCompactions.getQueue();
160       it = lq.iterator();
161       while(it.hasNext()){
162         queueLists.append("    "+it.next().toString());
163         queueLists.append("\n");
164       }
165     }
166     
167     queueLists.append("\n");
168     queueLists.append("  Split Queue:\n");
169     lq = splits.getQueue();
170     it = lq.iterator();
171     while(it.hasNext()){
172       queueLists.append("    "+it.next().toString());
173       queueLists.append("\n");
174     }
175     
176     queueLists.append("\n");
177     queueLists.append("  Region Merge Queue:\n");
178     lq = mergePool.getQueue();
179     it = lq.iterator();
180     while (it.hasNext()) {
181       queueLists.append("    " + it.next().toString());
182       queueLists.append("\n");
183     }
184 
185     return queueLists.toString();
186   }
187 
188   public synchronized void requestRegionsMerge(final HRegion a,
189       final HRegion b, final boolean forcible) {
190     try {
191       mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible));
192       if (LOG.isDebugEnabled()) {
193         LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
194             + forcible + ".  " + this);
195       }
196     } catch (RejectedExecutionException ree) {
197       LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
198           + forcible, ree);
199     }
200   }
201 
202   public synchronized boolean requestSplit(final HRegion r) {
203     // don't split regions that are blocking
204     if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
205       byte[] midKey = r.checkSplit();
206       if (midKey != null) {
207         requestSplit(r, midKey);
208         return true;
209       }
210     }
211     return false;
212   }
213 
214   public synchronized void requestSplit(final HRegion r, byte[] midKey) {
215     if (midKey == null) {
216       LOG.debug("Region " + r.getRegionNameAsString() +
217         " not splittable because midkey=null");
218       if (r.shouldForceSplit()) {
219         r.clearSplit();
220       }
221       return;
222     }
223     try {
224       this.splits.execute(new SplitRequest(r, midKey, this.server));
225       if (LOG.isDebugEnabled()) {
226         LOG.debug("Split requested for " + r + ".  " + this);
227       }
228     } catch (RejectedExecutionException ree) {
229       LOG.info("Could not execute split for " + r, ree);
230     }
231   }
232 
233   @Override
234   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why)
235       throws IOException {
236     return requestCompaction(r, why, null);
237   }
238 
239   @Override
240   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
241       List<Pair<CompactionRequest, Store>> requests) throws IOException {
242     return requestCompaction(r, why, Store.NO_PRIORITY, requests);
243   }
244 
245   @Override
246   public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
247       final String why, CompactionRequest request) throws IOException {
248     return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
249   }
250 
251   @Override
252   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
253       int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
254     return requestCompactionInternal(r, why, p, requests, true);
255   }
256 
257   private List<CompactionRequest> requestCompactionInternal(final HRegion r, final String why,
258       int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow) throws IOException {
259     // not a special compaction request, so make our own list
260     List<CompactionRequest> ret = null;
261     if (requests == null) {
262       ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
263       for (Store s : r.getStores().values()) {
264         CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
265         if (selectNow) ret.add(cr);
266       }
267     } else {
268       Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
269       ret = new ArrayList<CompactionRequest>(requests.size());
270       for (Pair<CompactionRequest, Store> pair : requests) {
271         ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
272       }
273     }
274     return ret;
275   }
276 
277   public CompactionRequest requestCompaction(final HRegion r, final Store s,
278       final String why, int priority, CompactionRequest request) throws IOException {
279     return requestCompactionInternal(r, s, why, priority, request, true);
280   }
281 
282   public synchronized void requestSystemCompaction(
283       final HRegion r, final String why) throws IOException {
284     requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
285   }
286 
287   public void requestSystemCompaction(
288       final HRegion r, final Store s, final String why) throws IOException {
289     requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
290   }
291 
292   /**
293    * @param r HRegion store belongs to
294    * @param s Store to request compaction on
295    * @param why Why compaction requested -- used in debug messages
296    * @param priority override the default priority (NO_PRIORITY == decide)
297    * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
298    *          compaction will be used.
299    */
300   private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
301       final String why, int priority, CompactionRequest request, boolean selectNow)
302           throws IOException {
303     if (this.server.isStopped()
304         || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
305       return null;
306     }
307 
308     CompactionContext compaction = null;
309     if (selectNow) {
310       compaction = selectCompaction(r, s, priority, request);
311       if (compaction == null) return null; // message logged inside
312     }
313 
314     // We assume that most compactions are small. So, put system compactions into small
315     // pool; we will do selection there, and move to large pool if necessary.
316     long size = selectNow ? compaction.getRequest().getSize() : 0;
317     ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
318       ? longCompactions : shortCompactions;
319     pool.execute(new CompactionRunner(s, r, compaction, pool));
320     if (LOG.isDebugEnabled()) {
321       String type = (pool == shortCompactions) ? "Small " : "Large ";
322       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
323           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
324     }
325     return selectNow ? compaction.getRequest() : null;
326   }
327 
328   private CompactionContext selectCompaction(final HRegion r, final Store s,
329       int priority, CompactionRequest request) throws IOException {
330     CompactionContext compaction = s.requestCompaction(priority, request);
331     if (compaction == null) {
332       if(LOG.isDebugEnabled()) {
333         LOG.debug("Not compacting " + r.getRegionNameAsString() +
334             " because compaction request was cancelled");
335       }
336       return null;
337     }
338     assert compaction.hasSelection();
339     if (priority != Store.NO_PRIORITY) {
340       compaction.getRequest().setPriority(priority);
341     }
342     return compaction;
343   }
344 
345   /**
346    * Only interrupt once it's done with a run through the work loop.
347    */
348   void interruptIfNecessary() {
349     splits.shutdown();
350     mergePool.shutdown();
351     longCompactions.shutdown();
352     shortCompactions.shutdown();
353   }
354 
355   private void waitFor(ThreadPoolExecutor t, String name) {
356     boolean done = false;
357     while (!done) {
358       try {
359         done = t.awaitTermination(60, TimeUnit.SECONDS);
360         LOG.info("Waiting for " + name + " to finish...");
361         if (!done) {
362           t.shutdownNow();
363         }
364       } catch (InterruptedException ie) {
365         LOG.warn("Interrupted waiting for " + name + " to finish...");
366       }
367     }
368   }
369 
370   void join() {
371     waitFor(splits, "Split Thread");
372     waitFor(mergePool, "Merge Thread");
373     waitFor(longCompactions, "Large Compaction Thread");
374     waitFor(shortCompactions, "Small Compaction Thread");
375   }
376 
377   /**
378    * Returns the current size of the queue containing regions that are
379    * processed.
380    *
381    * @return The current size of the regions queue.
382    */
383   public int getCompactionQueueSize() {
384     return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
385   }
386 
387   public int getLargeCompactionQueueSize() {
388     return longCompactions.getQueue().size();
389   }
390 
391 
392   public int getSmallCompactionQueueSize() {
393     return shortCompactions.getQueue().size();
394   }
395 
396 
397   private boolean shouldSplitRegion() {
398     return (regionSplitLimit > server.getNumberOfOnlineRegions());
399   }
400 
401   /**
402    * @return the regionSplitLimit
403    */
404   public int getRegionSplitLimit() {
405     return this.regionSplitLimit;
406   }
407 
408   private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
409     private final Store store;
410     private final HRegion region;
411     private CompactionContext compaction;
412     private int queuedPriority;
413     private ThreadPoolExecutor parent;
414 
415     public CompactionRunner(Store store, HRegion region,
416         CompactionContext compaction, ThreadPoolExecutor parent) {
417       super();
418       this.store = store;
419       this.region = region;
420       this.compaction = compaction;
421       this.queuedPriority = (this.compaction == null)
422           ? store.getCompactPriority() : compaction.getRequest().getPriority();
423       this.parent = parent;
424     }
425 
426     @Override
427     public String toString() {
428       return (this.compaction != null) ? ("Request = " + compaction.getRequest())
429           : ("Store = " + store.toString() + ", pri = " + queuedPriority);
430     }
431 
432     @Override
433     public void run() {
434       Preconditions.checkNotNull(server);
435       if (server.isStopped()
436           || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
437         return;
438       }
439       // Common case - system compaction without a file selection. Select now.
440       if (this.compaction == null) {
441         int oldPriority = this.queuedPriority;
442         this.queuedPriority = this.store.getCompactPriority();
443         if (this.queuedPriority > oldPriority) {
444           // Store priority decreased while we were in queue (due to some other compaction?),
445           // requeue with new priority to avoid blocking potential higher priorities.
446           this.parent.execute(this);
447           return;
448         }
449         try {
450           this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
451         } catch (IOException ex) {
452           LOG.error("Compaction selection failed " + this, ex);
453           server.checkFileSystem();
454           return;
455         }
456         if (this.compaction == null) return; // nothing to do
457         // Now see if we are in correct pool for the size; if not, go to the correct one.
458         // We might end up waiting for a while, so cancel the selection.
459         assert this.compaction.hasSelection();
460         ThreadPoolExecutor pool = store.throttleCompaction(
461             compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
462         if (this.parent != pool) {
463           this.store.cancelRequestedCompaction(this.compaction);
464           this.compaction = null;
465           this.parent = pool;
466           this.parent.execute(this);
467           return;
468         }
469       }
470       // Finally we can compact something.
471       assert this.compaction != null;
472 
473       this.compaction.getRequest().beforeExecute();
474       try {
475         // Note: please don't put single-compaction logic here;
476         //       put it into region/store/etc. This is CST logic.
477         long start = EnvironmentEdgeManager.currentTime();
478         boolean completed = region.compact(compaction, store);
479         long now = EnvironmentEdgeManager.currentTime();
480         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
481               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
482         if (completed) {
483           // degenerate case: blocked regions require recursive enqueues
484           if (store.getCompactPriority() <= 0) {
485             requestSystemCompaction(region, store, "Recursive enqueue");
486           } else {
487             // see if the compaction has caused us to exceed max region size
488             requestSplit(region);
489           }
490         }
491       } catch (IOException ex) {
492         IOException remoteEx =
493             ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
494         LOG.error("Compaction failed " + this, remoteEx);
495         if (remoteEx != ex) {
496           LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
497         }
498         server.checkFileSystem();
499       } catch (Exception ex) {
500         LOG.error("Compaction failed " + this, ex);
501         server.checkFileSystem();
502       } finally {
503         LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
504       }
505       this.compaction.getRequest().afterExecute();
506     }
507 
508     private String formatStackTrace(Exception ex) {
509       StringWriter sw = new StringWriter();
510       PrintWriter pw = new PrintWriter(sw);
511       ex.printStackTrace(pw);
512       pw.flush();
513       return sw.toString();
514     }
515 
516     @Override
517     public int compareTo(CompactionRunner o) {
518       // Only compare the underlying request (if any), for queue sorting purposes.
519       int compareVal = queuedPriority - o.queuedPriority; // compare priority
520       if (compareVal != 0) return compareVal;
521       CompactionContext tc = this.compaction, oc = o.compaction;
522       // Sort pre-selected (user?) compactions before system ones with equal priority.
523       return (tc == null) ? ((oc == null) ? 0 : 1)
524           : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
525     }
526   }
527 
528   /**
529    * Cleanup class to use when rejecting a compaction request from the queue.
530    */
531   private static class Rejection implements RejectedExecutionHandler {
532     @Override
533     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
534       if (runnable instanceof CompactionRunner) {
535         CompactionRunner runner = (CompactionRunner)runnable;
536         LOG.debug("Compaction Rejected: " + runner);
537         runner.store.cancelRequestedCompaction(runner.compaction);
538       }
539     }
540   }
541 }