View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.util;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.CompletionService;
28  import java.util.concurrent.ExecutionException;
29  import java.util.concurrent.ExecutorCompletionService;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.classification.InterfaceAudience;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.regionserver.HRegion;
43  
44  /**
45   * Utility methods for interacting with the regions.
46   */
47  @InterfaceAudience.Private
48  public abstract class ModifyRegionUtils {
49    private static final Log LOG = LogFactory.getLog(ModifyRegionUtils.class);
50  
51    private ModifyRegionUtils() {
52    }
53  
54    public interface RegionFillTask {
55      public void fillRegion(final HRegion region) throws IOException;
56    }
57  
58    /**
59     * Create new set of regions on the specified file-system.
60     * NOTE: that you should add the regions to .META. after this operation.
61     *
62     * @param conf {@link Configuration}
63     * @param rootDir Root directory for HBase instance
64     * @param hTableDescriptor description of the table
65     * @param newRegions {@link HRegionInfo} that describes the regions to create
66     * @throws IOException
67     */
68    public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
69        final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions) throws IOException {
70      return createRegions(conf, rootDir, hTableDescriptor, newRegions, null);
71    }
72  
73    /**
74     * Create new set of regions on the specified file-system.
75     * NOTE: that you should add the regions to .META. after this operation.
76     *
77     * @param conf {@link Configuration}
78     * @param rootDir Root directory for HBase instance
79     * @param hTableDescriptor description of the table
80     * @param newRegions {@link HRegionInfo} that describes the regions to create
81     * @param task {@link RegionFillTask} custom code to populate region after creation
82     * @throws IOException
83     */
84    public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
85        final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
86        final RegionFillTask task) throws IOException {
87      if (newRegions == null) return null;
88      int regionNumber = newRegions.length;
89      ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
90          "RegionOpenAndInitThread-" + hTableDescriptor.getNameAsString(), regionNumber);
91      CompletionService<HRegionInfo> completionService = new ExecutorCompletionService<HRegionInfo>(
92          regionOpenAndInitThreadPool);
93      List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
94      for (final HRegionInfo newRegion : newRegions) {
95        completionService.submit(new Callable<HRegionInfo>() {
96          public HRegionInfo call() throws IOException {
97            // 1. Create HRegion
98            HRegion region = HRegion.createHRegion(newRegion,
99                rootDir, conf, hTableDescriptor, null,
100               false, true);
101           try {
102             // 2. Custom user code to interact with the created region
103             if (task != null) {
104               task.fillRegion(region);
105             }
106           } finally {
107             // 3. Close the new region to flush to disk. Close log file too.
108             region.close();
109           }
110           return region.getRegionInfo();
111         }
112       });
113     }
114     try {
115       // 4. wait for all regions to finish creation
116       for (int i = 0; i < regionNumber; i++) {
117         Future<HRegionInfo> future = completionService.take();
118         HRegionInfo regionInfo = future.get();
119         regionInfos.add(regionInfo);
120       }
121     } catch (InterruptedException e) {
122       LOG.error("Caught " + e + " during region creation");
123       throw new InterruptedIOException(e.getMessage());
124     } catch (ExecutionException e) {
125       throw new IOException(e);
126     } finally {
127       regionOpenAndInitThreadPool.shutdownNow();
128     }
129     return regionInfos;
130   }
131 
132   /*
133    * used by createRegions() to get the thread pool executor based on the
134    * "hbase.hregion.open.and.init.threads.max" property.
135    */
136   static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
137       final String threadNamePrefix, int regionNumber) {
138     int maxThreads = Math.min(regionNumber, conf.getInt(
139         "hbase.hregion.open.and.init.threads.max", 10));
140     ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
141     .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
142         new ThreadFactory() {
143           private int count = 1;
144 
145           public Thread newThread(Runnable r) {
146             Thread t = new Thread(r, threadNamePrefix + "-" + count++);
147             return t;
148           }
149         });
150     return regionOpenAndInitThreadPool;
151   }
152 }