001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.Callable;
027import java.util.concurrent.CompletionService;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorCompletionService;
030import java.util.concurrent.Future;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.apache.hadoop.classification.InterfaceAudience;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HRegionInfo;
041import org.apache.hadoop.hbase.HTableDescriptor;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043
044/**
045 * Utility methods for interacting with the regions.
046 */
047@InterfaceAudience.Private
048public abstract class ModifyRegionUtils {
049  private static final Log LOG = LogFactory.getLog(ModifyRegionUtils.class);
050
051  private ModifyRegionUtils() {
052  }
053
054  public interface RegionFillTask {
055    public void fillRegion(final HRegion region) throws IOException;
056  }
057
058  /**
059   * Create new set of regions on the specified file-system.
060   * NOTE: that you should add the regions to .META. after this operation.
061   *
062   * @param conf {@link Configuration}
063   * @param rootDir Root directory for HBase instance
064   * @param hTableDescriptor description of the table
065   * @param newRegions {@link HRegionInfo} that describes the regions to create
066   * @throws IOException
067   */
068  public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
069      final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions) throws IOException {
070    return createRegions(conf, rootDir, hTableDescriptor, newRegions, null);
071  }
072
073  /**
074   * Create new set of regions on the specified file-system.
075   * NOTE: that you should add the regions to .META. after this operation.
076   *
077   * @param conf {@link Configuration}
078   * @param rootDir Root directory for HBase instance
079   * @param hTableDescriptor description of the table
080   * @param newRegions {@link HRegionInfo} that describes the regions to create
081   * @param task {@link RegionFillTask} custom code to populate region after creation
082   * @throws IOException
083   */
084  public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
085      final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
086      final RegionFillTask task) throws IOException {
087    if (newRegions == null) return null;
088    int regionNumber = newRegions.length;
089    ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
090        "RegionOpenAndInitThread-" + hTableDescriptor.getNameAsString(), regionNumber);
091    CompletionService<HRegionInfo> completionService = new ExecutorCompletionService<HRegionInfo>(
092        regionOpenAndInitThreadPool);
093    List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
094    for (final HRegionInfo newRegion : newRegions) {
095      completionService.submit(new Callable<HRegionInfo>() {
096        public HRegionInfo call() throws IOException {
097          // 1. Create HRegion
098          HRegion region = HRegion.createHRegion(newRegion,
099              rootDir, conf, hTableDescriptor, null,
100              false, true);
101          try {
102            // 2. Custom user code to interact with the created region
103            if (task != null) {
104              task.fillRegion(region);
105            }
106          } finally {
107            // 3. Close the new region to flush to disk. Close log file too.
108            region.close();
109          }
110          return region.getRegionInfo();
111        }
112      });
113    }
114    try {
115      // 4. wait for all regions to finish creation
116      for (int i = 0; i < regionNumber; i++) {
117        Future<HRegionInfo> future = completionService.take();
118        HRegionInfo regionInfo = future.get();
119        regionInfos.add(regionInfo);
120      }
121    } catch (InterruptedException e) {
122      LOG.error("Caught " + e + " during region creation");
123      throw new InterruptedIOException(e.getMessage());
124    } catch (ExecutionException e) {
125      throw new IOException(e);
126    } finally {
127      regionOpenAndInitThreadPool.shutdownNow();
128    }
129    return regionInfos;
130  }
131
132  /*
133   * used by createRegions() to get the thread pool executor based on the
134   * "hbase.hregion.open.and.init.threads.max" property.
135   */
136  static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
137      final String threadNamePrefix, int regionNumber) {
138    int maxThreads = Math.min(regionNumber, conf.getInt(
139        "hbase.hregion.open.and.init.threads.max", 10));
140    ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
141    .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
142        new ThreadFactory() {
143          private int count = 1;
144
145          public Thread newThread(Runnable r) {
146            Thread t = new Thread(r, threadNamePrefix + "-" + count++);
147            return t;
148          }
149        });
150    return regionOpenAndInitThreadPool;
151  }
152}