View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.util;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.CompletionService;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ExecutorCompletionService;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.lang.RandomStringUtils;
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.regionserver.HRegion;
46  import org.apache.hadoop.hbase.master.AssignmentManager;
47  import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
48  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
49  import org.apache.hadoop.hbase.wal.WAL;
50  import org.apache.hadoop.hbase.wal.WALFactory;
51  
52  /**
53   * Utility methods for interacting with the regions.
54   */
55  @InterfaceAudience.Private
56  public abstract class ModifyRegionUtils {
57    private static final Log LOG = LogFactory.getLog(ModifyRegionUtils.class);
58  
59    private ModifyRegionUtils() {
60    }
61  
62    public interface RegionFillTask {
63      void fillRegion(final HRegion region) throws IOException;
64    }
65  
66    public interface RegionEditTask {
67      void editRegion(final HRegionInfo region) throws IOException;
68    }
69  
70    public static HRegionInfo[] createHRegionInfos(HTableDescriptor hTableDescriptor,
71        byte[][] splitKeys) {
72      long regionId = System.currentTimeMillis();
73      HRegionInfo[] hRegionInfos = null;
74      if (splitKeys == null || splitKeys.length == 0) {
75        hRegionInfos = new HRegionInfo[]{
76          new HRegionInfo(hTableDescriptor.getTableName(), null, null, false, regionId)
77        };
78      } else {
79        int numRegions = splitKeys.length + 1;
80        hRegionInfos = new HRegionInfo[numRegions];
81        byte[] startKey = null;
82        byte[] endKey = null;
83        for (int i = 0; i < numRegions; i++) {
84          endKey = (i == splitKeys.length) ? null : splitKeys[i];
85          hRegionInfos[i] =
86               new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey,
87                   false, regionId);
88          startKey = endKey;
89        }
90      }
91      return hRegionInfos;
92    }
93  
94    /**
95     * Create new set of regions on the specified file-system.
96     * NOTE: that you should add the regions to hbase:meta after this operation.
97     *
98     * @param conf {@link Configuration}
99     * @param rootDir Root directory for HBase instance
100    * @param hTableDescriptor description of the table
101    * @param newRegions {@link HRegionInfo} that describes the regions to create
102    * @param task {@link RegionFillTask} custom code to populate region after creation
103    * @throws IOException
104    */
105   public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
106       final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
107       final RegionFillTask task) throws IOException {
108     if (newRegions == null) return null;
109     int regionNumber = newRegions.length;
110     ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
111         "RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber);
112     try {
113       return createRegions(exec, conf, rootDir, hTableDescriptor, newRegions, task);
114     } finally {
115       exec.shutdownNow();
116     }
117   }
118 
119   /**
120    * Create new set of regions on the specified file-system.
121    * NOTE: that you should add the regions to hbase:meta after this operation.
122    *
123    * @param exec Thread Pool Executor
124    * @param conf {@link Configuration}
125    * @param rootDir Root directory for HBase instance
126    * @param hTableDescriptor description of the table
127    * @param newRegions {@link HRegionInfo} that describes the regions to create
128    * @param task {@link RegionFillTask} custom code to populate region after creation
129    * @throws IOException
130    */
131   public static List<HRegionInfo> createRegions(final ThreadPoolExecutor exec,
132       final Configuration conf, final Path rootDir,
133       final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
134       final RegionFillTask task) throws IOException {
135     if (newRegions == null) return null;
136     int regionNumber = newRegions.length;
137     CompletionService<HRegionInfo> completionService =
138       new ExecutorCompletionService<HRegionInfo>(exec);
139     List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
140     for (final HRegionInfo newRegion : newRegions) {
141       completionService.submit(new Callable<HRegionInfo>() {
142         @Override
143         public HRegionInfo call() throws IOException {
144           return createRegion(conf, rootDir, hTableDescriptor, newRegion, task);
145         }
146       });
147     }
148     try {
149       // wait for all regions to finish creation
150       for (int i = 0; i < regionNumber; i++) {
151         regionInfos.add(completionService.take().get());
152       }
153     } catch (InterruptedException e) {
154       LOG.error("Caught " + e + " during region creation");
155       throw new InterruptedIOException(e.getMessage());
156     } catch (ExecutionException e) {
157       throw new IOException(e);
158     }
159     return regionInfos;
160   }
161 
162   /**
163    * Create new set of regions on the specified file-system.
164    * @param conf {@link Configuration}
165    * @param rootDir Root directory for HBase instance
166    * @param hTableDescriptor description of the table
167    * @param newRegion {@link HRegionInfo} that describes the region to create
168    * @param task {@link RegionFillTask} custom code to populate region after creation
169    * @throws IOException
170    */
171   public static HRegionInfo createRegion(final Configuration conf, final Path rootDir,
172       final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion,
173       final RegionFillTask task) throws IOException {
174     // 1. Create HRegion
175     // The WAL subsystem will use the default rootDir rather than the passed in rootDir
176     // unless I pass along via the conf.
177     Configuration confForWAL = new Configuration(conf);
178     confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
179     WAL wal = (new WALFactory(confForWAL,
180         Collections.<WALActionsListener>singletonList(new MetricsWAL()),
181         "hregion-" + RandomStringUtils.randomNumeric(8))).
182         getWAL(newRegion.getEncodedNameAsBytes());
183     HRegion region = HRegion.createHRegion(newRegion, rootDir, conf, hTableDescriptor, wal, false);
184     try {
185       // 2. Custom user code to interact with the created region
186       if (task != null) {
187         task.fillRegion(region);
188       }
189     } finally {
190       // 3. Close the new region to flush to disk. Close log file too.
191       region.close();
192       wal.close();
193     }
194     return region.getRegionInfo();
195   }
196 
197   /**
198    * Execute the task on the specified set of regions.
199    *
200    * @param exec Thread Pool Executor
201    * @param regions {@link HRegionInfo} that describes the regions to edit
202    * @param task {@link RegionFillTask} custom code to edit the region
203    * @throws IOException
204    */
205   public static void editRegions(final ThreadPoolExecutor exec,
206       final Collection<HRegionInfo> regions, final RegionEditTask task) throws IOException {
207     final ExecutorCompletionService<Void> completionService =
208       new ExecutorCompletionService<Void>(exec);
209     for (final HRegionInfo hri: regions) {
210       completionService.submit(new Callable<Void>() {
211         @Override
212         public Void call() throws IOException {
213           task.editRegion(hri);
214           return null;
215         }
216       });
217     }
218 
219     try {
220       for (HRegionInfo hri: regions) {
221         completionService.take().get();
222       }
223     } catch (InterruptedException e) {
224       throw new InterruptedIOException(e.getMessage());
225     } catch (ExecutionException e) {
226       IOException ex = new IOException();
227       ex.initCause(e.getCause());
228       throw ex;
229     }
230   }
231 
232   /*
233    * used by createRegions() to get the thread pool executor based on the
234    * "hbase.hregion.open.and.init.threads.max" property.
235    */
236   static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
237       final String threadNamePrefix, int regionNumber) {
238     int maxThreads = Math.min(regionNumber, conf.getInt(
239         "hbase.hregion.open.and.init.threads.max", 10));
240     ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
241     .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
242         new ThreadFactory() {
243           private int count = 1;
244 
245           @Override
246           public Thread newThread(Runnable r) {
247             Thread t = new Thread(r, threadNamePrefix + "-" + count++);
248             return t;
249           }
250         });
251     return regionOpenAndInitThreadPool;
252   }
253 
254   /**
255    * Triggers a bulk assignment of the specified regions
256    *
257    * @param assignmentManager the Assignment Manger
258    * @param regionInfos the list of regions to assign
259    * @throws IOException if an error occurred during the assignment
260    */
261   public static void assignRegions(final AssignmentManager assignmentManager,
262       final List<HRegionInfo> regionInfos) throws IOException {
263     try {
264       assignmentManager.getRegionStates().createRegionStates(regionInfos);
265       assignmentManager.assign(regionInfos);
266     } catch (InterruptedException e) {
267       LOG.error("Caught " + e + " during round-robin assignment");
268       InterruptedIOException ie = new InterruptedIOException(e.getMessage());
269       ie.initCause(e);
270       throw ie;
271     }
272   }
273 }