View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.util;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.CompletionService;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorCompletionService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.classification.InterfaceAudience;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.regionserver.HRegion;
44  import org.apache.hadoop.hbase.master.AssignmentManager;
45  
46  /**
47   * Utility methods for interacting with the regions.
48   */
49  @InterfaceAudience.Private
50  public abstract class ModifyRegionUtils {
51    private static final Log LOG = LogFactory.getLog(ModifyRegionUtils.class);
52  
53    private ModifyRegionUtils() {
54    }
55  
56    public interface RegionFillTask {
57      void fillRegion(final HRegion region) throws IOException;
58    }
59  
60    public interface RegionEditTask {
61      void editRegion(final HRegionInfo region) throws IOException;
62    }
63  
64    /**
65     * Create new set of regions on the specified file-system.
66     * NOTE: that you should add the regions to hbase:meta after this operation.
67     *
68     * @param conf {@link Configuration}
69     * @param rootDir Root directory for HBase instance
70     * @param hTableDescriptor description of the table
71     * @param newRegions {@link HRegionInfo} that describes the regions to create
72     * @throws IOException
73     */
74    public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
75        final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions) throws IOException {
76      return createRegions(conf, rootDir, hTableDescriptor, newRegions, null);
77    }
78  
79    /**
80     * Create new set of regions on the specified file-system.
81     * NOTE: that you should add the regions to hbase:meta after this operation.
82     *
83     * @param conf {@link Configuration}
84     * @param rootDir Root directory for HBase instance
85     * @param hTableDescriptor description of the table
86     * @param newRegions {@link HRegionInfo} that describes the regions to create
87     * @param task {@link RegionFillTask} custom code to populate region after creation
88     * @throws IOException
89     */
90    public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
91        final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
92        final RegionFillTask task) throws IOException {
93  
94        Path tableDir = FSUtils.getTableDir(rootDir, hTableDescriptor.getTableName());
95        return createRegions(conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
96    }
97  
98    /**
99     * Create new set of regions on the specified file-system.
100    * NOTE: that you should add the regions to hbase:meta after this operation.
101    *
102    * @param conf {@link Configuration}
103    * @param rootDir Root directory for HBase instance
104    * @param tableDir table directory
105    * @param hTableDescriptor description of the table
106    * @param newRegions {@link HRegionInfo} that describes the regions to create
107    * @param task {@link RegionFillTask} custom code to populate region after creation
108    * @throws IOException
109    */
110   public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
111       final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
112       final RegionFillTask task) throws IOException {
113     if (newRegions == null) return null;
114     int regionNumber = newRegions.length;
115     ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
116         "RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber);
117     try {
118       return createRegions(exec, conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
119     } finally {
120       exec.shutdownNow();
121     }
122   }
123 
124   /**
125    * Create new set of regions on the specified file-system.
126    * NOTE: that you should add the regions to hbase:meta after this operation.
127    *
128    * @param exec Thread Pool Executor
129    * @param conf {@link Configuration}
130    * @param rootDir Root directory for HBase instance
131    * @param tableDir table directory
132    * @param hTableDescriptor description of the table
133    * @param newRegions {@link HRegionInfo} that describes the regions to create
134    * @param task {@link RegionFillTask} custom code to populate region after creation
135    * @throws IOException
136    */
137   public static List<HRegionInfo> createRegions(final ThreadPoolExecutor exec,
138       final Configuration conf, final Path rootDir, final Path tableDir,
139       final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
140       final RegionFillTask task) throws IOException {
141     if (newRegions == null) return null;
142     int regionNumber = newRegions.length;
143     CompletionService<HRegionInfo> completionService =
144       new ExecutorCompletionService<HRegionInfo>(exec);
145     List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
146     for (final HRegionInfo newRegion : newRegions) {
147       completionService.submit(new Callable<HRegionInfo>() {
148         @Override
149         public HRegionInfo call() throws IOException {
150           return createRegion(conf, rootDir, tableDir, hTableDescriptor, newRegion, task);
151         }
152       });
153     }
154     try {
155       // wait for all regions to finish creation
156       for (int i = 0; i < regionNumber; i++) {
157         Future<HRegionInfo> future = completionService.take();
158         HRegionInfo regionInfo = future.get();
159         regionInfos.add(regionInfo);
160       }
161     } catch (InterruptedException e) {
162       LOG.error("Caught " + e + " during region creation");
163       throw new InterruptedIOException(e.getMessage());
164     } catch (ExecutionException e) {
165       throw new IOException(e);
166     }
167     return regionInfos;
168   }
169 
170   /**
171    * Create new set of regions on the specified file-system.
172    * @param conf {@link Configuration}
173    * @param rootDir Root directory for HBase instance
174    * @param tableDir table directory
175    * @param hTableDescriptor description of the table
176    * @param newRegion {@link HRegionInfo} that describes the region to create
177    * @param task {@link RegionFillTask} custom code to populate region after creation
178    * @throws IOException
179    */
180   public static HRegionInfo createRegion(final Configuration conf, final Path rootDir,
181       final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion,
182       final RegionFillTask task) throws IOException {
183     // 1. Create HRegion
184     HRegion region = HRegion.createHRegion(newRegion,
185       rootDir, tableDir, conf, hTableDescriptor, null,
186       false, true);
187     try {
188       // 2. Custom user code to interact with the created region
189       if (task != null) {
190         task.fillRegion(region);
191       }
192     } finally {
193       // 3. Close the new region to flush to disk. Close log file too.
194       region.close();
195     }
196     return region.getRegionInfo();
197   }
198 
199   /**
200    * Execute the task on the specified set of regions.
201    *
202    * @param exec Thread Pool Executor
203    * @param regions {@link HRegionInfo} that describes the regions to edit
204    * @param task {@link RegionFillTask} custom code to edit the region
205    * @throws IOException
206    */
207   public static void editRegions(final ThreadPoolExecutor exec,
208       final Collection<HRegionInfo> regions, final RegionEditTask task) throws IOException {
209     final ExecutorCompletionService<Void> completionService =
210       new ExecutorCompletionService<Void>(exec);
211     for (final HRegionInfo hri: regions) {
212       completionService.submit(new Callable<Void>() {
213         @Override
214         public Void call() throws IOException {
215           task.editRegion(hri);
216           return null;
217         }
218       });
219     }
220 
221     try {
222       for (HRegionInfo hri: regions) {
223         completionService.take().get();
224       }
225     } catch (InterruptedException e) {
226       throw new InterruptedIOException(e.getMessage());
227     } catch (ExecutionException e) {
228       IOException ex = new IOException();
229       ex.initCause(e.getCause());
230       throw ex;
231     }
232   }
233 
234   /*
235    * used by createRegions() to get the thread pool executor based on the
236    * "hbase.hregion.open.and.init.threads.max" property.
237    */
238   static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
239       final String threadNamePrefix, int regionNumber) {
240     int maxThreads = Math.min(regionNumber, conf.getInt(
241         "hbase.hregion.open.and.init.threads.max", 10));
242     ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
243     .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
244         new ThreadFactory() {
245           private int count = 1;
246 
247           @Override
248           public Thread newThread(Runnable r) {
249             Thread t = new Thread(r, threadNamePrefix + "-" + count++);
250             return t;
251           }
252         });
253     return regionOpenAndInitThreadPool;
254   }
255 
256   /**
257    * Triggers a bulk assignment of the specified regions
258    *
259    * @param assignmentManager the Assignment Manger
260    * @param regionInfos the list of regions to assign
261    * @throws IOException if an error occurred during the assignment
262    */
263   public static void assignRegions(final AssignmentManager assignmentManager,
264       final List<HRegionInfo> regionInfos) throws IOException {
265     try {
266       assignmentManager.getRegionStates().createRegionStates(regionInfos);
267       assignmentManager.assign(regionInfos);
268     } catch (InterruptedException e) {
269       LOG.error("Caught " + e + " during round-robin assignment");
270       InterruptedIOException ie = new InterruptedIOException(e.getMessage());
271       ie.initCause(e);
272       throw ie;
273     }
274   }
275 }