View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.util;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.CompletionService;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ExecutorCompletionService;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.lang.RandomStringUtils;
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.regionserver.HRegion;
46  import org.apache.hadoop.hbase.master.AssignmentManager;
47  import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
48  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
49  import org.apache.hadoop.hbase.wal.WAL;
50  import org.apache.hadoop.hbase.wal.WALFactory;
51  
52  /**
53   * Utility methods for interacting with the regions.
54   */
55  @InterfaceAudience.Private
56  public abstract class ModifyRegionUtils {
57    private static final Log LOG = LogFactory.getLog(ModifyRegionUtils.class);
58  
59    private ModifyRegionUtils() {
60    }
61  
62    public interface RegionFillTask {
63      void fillRegion(final HRegion region) throws IOException;
64    }
65  
66    public interface RegionEditTask {
67      void editRegion(final HRegionInfo region) throws IOException;
68    }
69  
70    /**
71     * Create new set of regions on the specified file-system.
72     * NOTE: that you should add the regions to hbase:meta after this operation.
73     *
74     * @param conf {@link Configuration}
75     * @param rootDir Root directory for HBase instance
76     * @param hTableDescriptor description of the table
77     * @param newRegions {@link HRegionInfo} that describes the regions to create
78     * @param task {@link RegionFillTask} custom code to populate region after creation
79     * @throws IOException
80     */
81    public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
82        final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
83        final RegionFillTask task) throws IOException {
84      if (newRegions == null) return null;
85      int regionNumber = newRegions.length;
86      ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
87          "RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber);
88      try {
89        return createRegions(exec, conf, rootDir, hTableDescriptor, newRegions, task);
90      } finally {
91        exec.shutdownNow();
92      }
93    }
94  
95    /**
96     * Create new set of regions on the specified file-system.
97     * NOTE: that you should add the regions to hbase:meta after this operation.
98     *
99     * @param exec Thread Pool Executor
100    * @param conf {@link Configuration}
101    * @param rootDir Root directory for HBase instance
102    * @param hTableDescriptor description of the table
103    * @param newRegions {@link HRegionInfo} that describes the regions to create
104    * @param task {@link RegionFillTask} custom code to populate region after creation
105    * @throws IOException
106    */
107   public static List<HRegionInfo> createRegions(final ThreadPoolExecutor exec,
108       final Configuration conf, final Path rootDir,
109       final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
110       final RegionFillTask task) throws IOException {
111     if (newRegions == null) return null;
112     int regionNumber = newRegions.length;
113     CompletionService<HRegionInfo> completionService =
114       new ExecutorCompletionService<HRegionInfo>(exec);
115     List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
116     for (final HRegionInfo newRegion : newRegions) {
117       completionService.submit(new Callable<HRegionInfo>() {
118         @Override
119         public HRegionInfo call() throws IOException {
120           return createRegion(conf, rootDir, hTableDescriptor, newRegion, task);
121         }
122       });
123     }
124     try {
125       // wait for all regions to finish creation
126       for (int i = 0; i < regionNumber; i++) {
127         regionInfos.add(completionService.take().get());
128       }
129     } catch (InterruptedException e) {
130       LOG.error("Caught " + e + " during region creation");
131       throw new InterruptedIOException(e.getMessage());
132     } catch (ExecutionException e) {
133       throw new IOException(e);
134     }
135     return regionInfos;
136   }
137 
138   /**
139    * Create new set of regions on the specified file-system.
140    * @param conf {@link Configuration}
141    * @param rootDir Root directory for HBase instance
142    * @param hTableDescriptor description of the table
143    * @param newRegion {@link HRegionInfo} that describes the region to create
144    * @param task {@link RegionFillTask} custom code to populate region after creation
145    * @throws IOException
146    */
147   public static HRegionInfo createRegion(final Configuration conf, final Path rootDir,
148       final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion,
149       final RegionFillTask task) throws IOException {
150     // 1. Create HRegion
151     // The WAL subsystem will use the default rootDir rather than the passed in rootDir
152     // unless I pass along via the conf.
153     Configuration confForWAL = new Configuration(conf);
154     confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
155     WAL wal = (new WALFactory(confForWAL,
156         Collections.<WALActionsListener>singletonList(new MetricsWAL()),
157         "hregion-" + RandomStringUtils.randomNumeric(8))).
158         getWAL(newRegion.getEncodedNameAsBytes());
159     HRegion region = HRegion.createHRegion(newRegion, rootDir, conf, hTableDescriptor, wal, false);
160     try {
161       // 2. Custom user code to interact with the created region
162       if (task != null) {
163         task.fillRegion(region);
164       }
165     } finally {
166       // 3. Close the new region to flush to disk. Close log file too.
167       region.close();
168       wal.close();
169     }
170     return region.getRegionInfo();
171   }
172 
173   /**
174    * Execute the task on the specified set of regions.
175    *
176    * @param exec Thread Pool Executor
177    * @param regions {@link HRegionInfo} that describes the regions to edit
178    * @param task {@link RegionFillTask} custom code to edit the region
179    * @throws IOException
180    */
181   public static void editRegions(final ThreadPoolExecutor exec,
182       final Collection<HRegionInfo> regions, final RegionEditTask task) throws IOException {
183     final ExecutorCompletionService<Void> completionService =
184       new ExecutorCompletionService<Void>(exec);
185     for (final HRegionInfo hri: regions) {
186       completionService.submit(new Callable<Void>() {
187         @Override
188         public Void call() throws IOException {
189           task.editRegion(hri);
190           return null;
191         }
192       });
193     }
194 
195     try {
196       for (HRegionInfo hri: regions) {
197         completionService.take().get();
198       }
199     } catch (InterruptedException e) {
200       throw new InterruptedIOException(e.getMessage());
201     } catch (ExecutionException e) {
202       IOException ex = new IOException();
203       ex.initCause(e.getCause());
204       throw ex;
205     }
206   }
207 
208   /*
209    * used by createRegions() to get the thread pool executor based on the
210    * "hbase.hregion.open.and.init.threads.max" property.
211    */
212   static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
213       final String threadNamePrefix, int regionNumber) {
214     int maxThreads = Math.min(regionNumber, conf.getInt(
215         "hbase.hregion.open.and.init.threads.max", 10));
216     ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
217     .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
218         new ThreadFactory() {
219           private int count = 1;
220 
221           @Override
222           public Thread newThread(Runnable r) {
223             Thread t = new Thread(r, threadNamePrefix + "-" + count++);
224             return t;
225           }
226         });
227     return regionOpenAndInitThreadPool;
228   }
229 
230   /**
231    * Triggers a bulk assignment of the specified regions
232    *
233    * @param assignmentManager the Assignment Manger
234    * @param regionInfos the list of regions to assign
235    * @throws IOException if an error occurred during the assignment
236    */
237   public static void assignRegions(final AssignmentManager assignmentManager,
238       final List<HRegionInfo> regionInfos) throws IOException {
239     try {
240       assignmentManager.getRegionStates().createRegionStates(regionInfos);
241       assignmentManager.assign(regionInfos);
242     } catch (InterruptedException e) {
243       LOG.error("Caught " + e + " during round-robin assignment");
244       InterruptedIOException ie = new InterruptedIOException(e.getMessage());
245       ie.initCause(e);
246       throw ie;
247     }
248   }
249 }