View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.util;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.CompletionService;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorCompletionService;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.regionserver.HRegion;
44  import org.apache.hadoop.hbase.master.AssignmentManager;
45  
46  /**
47   * Utility methods for interacting with the regions.
48   */
49  @InterfaceAudience.Private
50  public abstract class ModifyRegionUtils {
51    private static final Log LOG = LogFactory.getLog(ModifyRegionUtils.class);
52  
53    private ModifyRegionUtils() {
54    }
55  
56    public interface RegionFillTask {
57      void fillRegion(final HRegion region) throws IOException;
58    }
59  
60    public interface RegionEditTask {
61      void editRegion(final HRegionInfo region) throws IOException;
62    }
63  
64    public static HRegionInfo[] createHRegionInfos(HTableDescriptor hTableDescriptor,
65        byte[][] splitKeys) {
66      long regionId = System.currentTimeMillis();
67      HRegionInfo[] hRegionInfos = null;
68      if (splitKeys == null || splitKeys.length == 0) {
69        hRegionInfos = new HRegionInfo[]{
70          new HRegionInfo(hTableDescriptor.getTableName(), null, null, false, regionId)
71        };
72      } else {
73        int numRegions = splitKeys.length + 1;
74        hRegionInfos = new HRegionInfo[numRegions];
75        byte[] startKey = null;
76        byte[] endKey = null;
77        for (int i = 0; i < numRegions; i++) {
78          endKey = (i == splitKeys.length) ? null : splitKeys[i];
79          hRegionInfos[i] =
80               new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey,
81                   false, regionId);
82          startKey = endKey;
83        }
84      }
85      return hRegionInfos;
86    }
87  
88    /**
89     * Create new set of regions on the specified file-system.
90     * NOTE: that you should add the regions to hbase:meta after this operation.
91     *
92     * @param conf {@link Configuration}
93     * @param rootDir Root directory for HBase instance
94     * @param hTableDescriptor description of the table
95     * @param newRegions {@link HRegionInfo} that describes the regions to create
96     * @param task {@link RegionFillTask} custom code to populate region after creation
97     * @throws IOException
98     */
99    public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
100       final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
101       final RegionFillTask task) throws IOException {
102     if (newRegions == null) return null;
103     int regionNumber = newRegions.length;
104     ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
105         "RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber);
106     try {
107       return createRegions(exec, conf, rootDir, hTableDescriptor, newRegions, task);
108     } finally {
109       exec.shutdownNow();
110     }
111   }
112 
113   /**
114    * Create new set of regions on the specified file-system.
115    * NOTE: that you should add the regions to hbase:meta after this operation.
116    *
117    * @param exec Thread Pool Executor
118    * @param conf {@link Configuration}
119    * @param rootDir Root directory for HBase instance
120    * @param hTableDescriptor description of the table
121    * @param newRegions {@link HRegionInfo} that describes the regions to create
122    * @param task {@link RegionFillTask} custom code to populate region after creation
123    * @throws IOException
124    */
125   public static List<HRegionInfo> createRegions(final ThreadPoolExecutor exec,
126       final Configuration conf, final Path rootDir,
127       final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
128       final RegionFillTask task) throws IOException {
129     if (newRegions == null) return null;
130     int regionNumber = newRegions.length;
131     CompletionService<HRegionInfo> completionService =
132       new ExecutorCompletionService<HRegionInfo>(exec);
133     List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
134     for (final HRegionInfo newRegion : newRegions) {
135       completionService.submit(new Callable<HRegionInfo>() {
136         @Override
137         public HRegionInfo call() throws IOException {
138           return createRegion(conf, rootDir, hTableDescriptor, newRegion, task);
139         }
140       });
141     }
142     try {
143       // wait for all regions to finish creation
144       for (int i = 0; i < regionNumber; i++) {
145         regionInfos.add(completionService.take().get());
146       }
147     } catch (InterruptedException e) {
148       LOG.error("Caught " + e + " during region creation");
149       throw new InterruptedIOException(e.getMessage());
150     } catch (ExecutionException e) {
151       throw new IOException(e);
152     }
153     return regionInfos;
154   }
155 
156   /**
157    * Create new set of regions on the specified file-system.
158    * @param conf {@link Configuration}
159    * @param rootDir Root directory for HBase instance
160    * @param hTableDescriptor description of the table
161    * @param newRegion {@link HRegionInfo} that describes the region to create
162    * @param task {@link RegionFillTask} custom code to populate region after creation
163    * @throws IOException
164    */
165   public static HRegionInfo createRegion(final Configuration conf, final Path rootDir,
166       final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion,
167       final RegionFillTask task) throws IOException {
168     // 1. Create HRegion
169     // The WAL subsystem will use the default rootDir rather than the passed in rootDir
170     // unless I pass along via the conf.
171     Configuration confForWAL = new Configuration(conf);
172     confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
173     HRegion region = HRegion.createHRegion(newRegion, rootDir, conf, hTableDescriptor, null, false);
174     try {
175       // 2. Custom user code to interact with the created region
176       if (task != null) {
177         task.fillRegion(region);
178       }
179     } finally {
180       // 3. Close the new region to flush to disk. Close log file too.
181       region.close();
182     }
183     return region.getRegionInfo();
184   }
185 
186   /**
187    * Execute the task on the specified set of regions.
188    *
189    * @param exec Thread Pool Executor
190    * @param regions {@link HRegionInfo} that describes the regions to edit
191    * @param task {@link RegionFillTask} custom code to edit the region
192    * @throws IOException
193    */
194   public static void editRegions(final ThreadPoolExecutor exec,
195       final Collection<HRegionInfo> regions, final RegionEditTask task) throws IOException {
196     final ExecutorCompletionService<Void> completionService =
197       new ExecutorCompletionService<Void>(exec);
198     for (final HRegionInfo hri: regions) {
199       completionService.submit(new Callable<Void>() {
200         @Override
201         public Void call() throws IOException {
202           task.editRegion(hri);
203           return null;
204         }
205       });
206     }
207 
208     try {
209       for (HRegionInfo hri: regions) {
210         completionService.take().get();
211       }
212     } catch (InterruptedException e) {
213       throw new InterruptedIOException(e.getMessage());
214     } catch (ExecutionException e) {
215       IOException ex = new IOException();
216       ex.initCause(e.getCause());
217       throw ex;
218     }
219   }
220 
221   /*
222    * used by createRegions() to get the thread pool executor based on the
223    * "hbase.hregion.open.and.init.threads.max" property.
224    */
225   static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
226       final String threadNamePrefix, int regionNumber) {
227     int maxThreads = Math.min(regionNumber, conf.getInt(
228         "hbase.hregion.open.and.init.threads.max", 10));
229     ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
230     .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
231         new ThreadFactory() {
232           private int count = 1;
233 
234           @Override
235           public Thread newThread(Runnable r) {
236             Thread t = new Thread(r, threadNamePrefix + "-" + count++);
237             return t;
238           }
239         });
240     return regionOpenAndInitThreadPool;
241   }
242 
243   /**
244    * Triggers a bulk assignment of the specified regions
245    *
246    * @param assignmentManager the Assignment Manger
247    * @param regionInfos the list of regions to assign
248    * @throws IOException if an error occurred during the assignment
249    */
250   public static void assignRegions(final AssignmentManager assignmentManager,
251       final List<HRegionInfo> regionInfos) throws IOException {
252     try {
253       assignmentManager.getRegionStates().createRegionStates(regionInfos);
254       assignmentManager.assign(regionInfos);
255     } catch (InterruptedException e) {
256       LOG.error("Caught " + e + " during round-robin assignment");
257       InterruptedIOException ie = new InterruptedIOException(e.getMessage());
258       ie.initCause(e);
259       throw ie;
260     }
261   }
262 }