View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver.compactions;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.List;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.RejectedExecutionHandler;
28  import java.util.concurrent.ThreadPoolExecutor;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.RemoteExceptionHandler;
35  import org.apache.hadoop.hbase.KeyValue.Type;
36  import org.apache.hadoop.hbase.regionserver.HRegion;
37  import org.apache.hadoop.hbase.regionserver.HRegionServer;
38  import org.apache.hadoop.hbase.regionserver.Store;
39  import org.apache.hadoop.hbase.regionserver.StoreFile;
40  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
41  import org.apache.hadoop.util.StringUtils;
42  
43  import com.google.common.base.Function;
44  import com.google.common.base.Joiner;
45  import com.google.common.base.Preconditions;
46  import com.google.common.base.Predicate;
47  import com.google.common.collect.Collections2;
48  
49  /**
50   * This class holds all details necessary to run a compaction.
51   */
52  public class CompactionRequest implements Comparable<CompactionRequest>,
53      Runnable {
54      static final Log LOG = LogFactory.getLog(CompactionRequest.class);
55      private final HRegion r;
56      private final Store s;
57      private CompactSelection compactSelection;
58      private long totalSize;
59      private boolean isMajor;
60      private int p;
61      private final Long timeInNanos;
62      private HRegionServer server = null;
63  
64      /**
65       * Map to track the number of compaction requested per region (id)
66       */
67      private static final ConcurrentHashMap<Long, AtomicInteger>
68        majorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
69      private static final ConcurrentHashMap<Long, AtomicInteger>
70        minorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
71  
72    /**
73     * Create a simple compaction request just for testing - this lets you specify everything you
74     * would need in the general case of testing compactions from an external perspective (e.g.
75     * requesting a compaction through the HRegion).
76     * @param store
77     * @param conf
78     * @param selection
79     * @param isMajor
80     * @return a request that is useful in requesting compactions for testing
81     */
82    public static CompactionRequest getRequestForTesting(Store store, Configuration conf,
83        Collection<StoreFile> selection, boolean isMajor) {
84      return new CompactionRequest(store.getHRegion(), store, new CompactSelection(conf,
85          new ArrayList<StoreFile>(
86          selection)), isMajor, 0, System.nanoTime());
87    }
88  
89    /**
90     * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
91     * compaction before being used. Uses the current system time on creation as the start time.
92     * @param region region that is being compacted
93     * @param store store which is being compacted
94     * @param priority specified priority with which this compaction should enter the queue.
95     */
96    public CompactionRequest(HRegion region, Store store, int priority) {
97      this(region, store, null, false, priority, System.nanoTime());
98    }
99  
100   public CompactionRequest(HRegion r, Store s, CompactSelection files, boolean isMajor, int p) {
101     // delegate to the internal constructor after checking basic preconditions
102     this(Preconditions.checkNotNull(r), s, Preconditions.checkNotNull(files), isMajor, p, System
103         .nanoTime());
104   }
105 
106   private CompactionRequest(HRegion region, Store store, CompactSelection files, boolean isMajor,
107       int priority, long startTime) {
108     this.r = region;
109     this.s = store;
110     this.isMajor = isMajor;
111     this.p = priority;
112     this.timeInNanos = startTime;
113     if (files != null) {
114       this.setSelection(files);
115     }
116   }
117 
118     /**
119      * Find out if a given region in compaction now.
120      *
121      * @param regionId
122      * @return
123      */
124     public static CompactionState getCompactionState(
125         final long regionId) {
126       Long key = Long.valueOf(regionId);
127       AtomicInteger major = majorCompactions.get(key);
128       AtomicInteger minor = minorCompactions.get(key);
129       int state = 0;
130       if (minor != null && minor.get() > 0) {
131         state += 1;  // use 1 to indicate minor here
132       }
133       if (major != null && major.get() > 0) {
134         state += 2;  // use 2 to indicate major here
135       }
136       switch (state) {
137       case 3:  // 3 = 2 + 1, so both major and minor
138         return CompactionState.MAJOR_AND_MINOR;
139       case 2:
140         return CompactionState.MAJOR;
141       case 1:
142         return CompactionState.MINOR;
143       default:
144         return CompactionState.NONE;
145       }
146     }
147 
148     public static void preRequest(final CompactionRequest cr){
149       Long key = Long.valueOf(cr.getHRegion().getRegionId());
150       ConcurrentHashMap<Long, AtomicInteger> compactions =
151         cr.isMajor() ? majorCompactions : minorCompactions;
152       AtomicInteger count = compactions.get(key);
153       if (count == null) {
154         compactions.putIfAbsent(key, new AtomicInteger(0));
155         count = compactions.get(key);
156       }
157       count.incrementAndGet();
158     }
159 
160     public static void postRequest(final CompactionRequest cr){
161       Long key = Long.valueOf(cr.getHRegion().getRegionId());
162       ConcurrentHashMap<Long, AtomicInteger> compactions =
163         cr.isMajor() ? majorCompactions : minorCompactions;
164       AtomicInteger count = compactions.get(key);
165       if (count != null) {
166         count.decrementAndGet();
167       }
168     }
169 
170     public void finishRequest() {
171       this.compactSelection.finishRequest();
172     }
173 
174     /**
175      * This function will define where in the priority queue the request will
176      * end up.  Those with the highest priorities will be first.  When the
177      * priorities are the same it will first compare priority then date
178      * to maintain a FIFO functionality.
179      *
180      * <p>Note: The date is only accurate to the millisecond which means it is
181      * possible that two requests were inserted into the queue within a
182      * millisecond.  When that is the case this function will break the tie
183      * arbitrarily.
184      */
185     @Override
186     public int compareTo(CompactionRequest request) {
187       //NOTE: The head of the priority queue is the least element
188       if (this.equals(request)) {
189         return 0; //they are the same request
190       }
191       int compareVal;
192 
193       compareVal = p - request.p; //compare priority
194       if (compareVal != 0) {
195         return compareVal;
196       }
197 
198       compareVal = timeInNanos.compareTo(request.timeInNanos);
199       if (compareVal != 0) {
200         return compareVal;
201       }
202 
203       // break the tie based on hash code
204       return this.hashCode() - request.hashCode();
205     }
206 
207     /** Gets the HRegion for the request */
208     public HRegion getHRegion() {
209       return r;
210     }
211 
212     /** Gets the Store for the request */
213     public Store getStore() {
214       return s;
215     }
216 
217     /** Gets the compact selection object for the request */
218     public CompactSelection getCompactSelection() {
219       return compactSelection;
220     }
221 
222     /** Gets the StoreFiles for the request */
223     public List<StoreFile> getFiles() {
224       return compactSelection.getFilesToCompact();
225     }
226 
227     /** Gets the total size of all StoreFiles in compaction */
228     public long getSize() {
229       return totalSize;
230     }
231 
232     public boolean isMajor() {
233       return this.isMajor;
234     }
235 
236     /** Gets the priority for the request */
237     public int getPriority() {
238       return p;
239     }
240 
241     /** Gets the priority for the request */
242     public void setPriority(int p) {
243       this.p = p;
244     }
245 
246     public void setServer(HRegionServer hrs) {
247       this.server = hrs;
248     }
249 
250     /**
251      * Set the files (and, implicitly, the size of the compaction based on those files)
252      * @param files files that should be included in the compaction
253      */
254     public void setSelection(CompactSelection files) {
255       long sz = 0;
256       for (StoreFile sf : files.getFilesToCompact()) {
257         sz += sf.getReader().length();
258       }
259       this.totalSize = sz;
260       this.compactSelection = files;
261     }
262 
263     /**
264      * Specify if this compaction should be a major compaction based on the state of the store
265      * @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
266      *          compaction
267      */
268     public void setIsMajor(boolean isMajor) {
269       this.isMajor = isMajor;
270     }
271 
272     @Override
273     public String toString() {
274       String fsList = Joiner.on(", ").join(
275           Collections2.transform(Collections2.filter(
276               compactSelection.getFilesToCompact(),
277               new Predicate<StoreFile>() {
278                 public boolean apply(StoreFile sf) {
279                   return sf.getReader() != null;
280                 }
281             }), new Function<StoreFile, String>() {
282               public String apply(StoreFile sf) {
283                 return StringUtils.humanReadableInt(sf.getReader().length());
284               }
285             }));
286 
287       return "regionName=" + r.getRegionNameAsString() +
288         ", storeName=" + new String(s.getFamily().getName()) +
289         ", fileCount=" + compactSelection.getFilesToCompact().size() +
290         ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
291           ((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
292         ", priority=" + p + ", time=" + timeInNanos;
293     }
294 
295     @Override
296     public void run() {
297       Preconditions.checkNotNull(server);
298       if (server.isStopped()) {
299         return;
300       }
301       try {
302         long start = EnvironmentEdgeManager.currentTimeMillis();
303         boolean completed = r.compact(this);
304         long now = EnvironmentEdgeManager.currentTimeMillis();
305         LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
306               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
307         if (completed) {
308           server.getMetrics().addCompaction(now - start, this.totalSize);
309           // degenerate case: blocked regions require recursive enqueues
310           if (s.getCompactPriority() <= 0) {
311             server.getCompactSplitThread()
312               .requestCompaction(r, s, "Recursive enqueue", null);
313           } else {
314             // see if the compaction has caused us to exceed max region size
315             server.getCompactSplitThread().requestSplit(r);
316           }
317         }
318       } catch (IOException ex) {
319         LOG.error("Compaction failed " + this, RemoteExceptionHandler
320             .checkIOException(ex));
321         server.checkFileSystem();
322       } catch (Exception ex) {
323         LOG.error("Compaction failed " + this, ex);
324         server.checkFileSystem();
325       } finally {
326         s.finishRequest(this);
327       LOG.debug("CompactSplitThread status: " + server.getCompactSplitThread());
328       }
329     }
330 
331     /**
332      * An enum for the region compaction state
333      */
334     public static enum CompactionState {
335       NONE,
336       MINOR,
337       MAJOR,
338       MAJOR_AND_MINOR;
339     }
340 
341     /**
342      * Cleanup class to use when rejecting a compaction request from the queue.
343      */
344     public static class Rejection implements RejectedExecutionHandler {
345 
346       @Override
347       public void rejectedExecution(Runnable request, ThreadPoolExecutor pool) {
348         if (request instanceof CompactionRequest) {
349           CompactionRequest cr = (CompactionRequest) request;
350           LOG.debug("Compaction Rejected: " + cr);
351           cr.getStore().finishRequest(cr);
352         }
353       }
354     }
355 }