001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.List;
027import java.util.concurrent.Callable;
028import java.util.concurrent.CompletionService;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorCompletionService;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.client.RegionInfoBuilder;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.regionserver.HRegion;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Utility methods for interacting with the regions.
047 */
048@InterfaceAudience.Private
049public abstract class ModifyRegionUtils {
050  private static final Logger LOG = LoggerFactory.getLogger(ModifyRegionUtils.class);
051
052  private ModifyRegionUtils() {
053  }
054
055  public interface RegionFillTask {
056    void fillRegion(final HRegion region) throws IOException;
057  }
058
059  public interface RegionEditTask {
060    void editRegion(final RegionInfo region) throws IOException;
061  }
062
063  public static RegionInfo[] createRegionInfos(TableDescriptor tableDescriptor,
064      byte[][] splitKeys) {
065    long regionId = System.currentTimeMillis();
066    RegionInfo[] hRegionInfos = null;
067    if (splitKeys == null || splitKeys.length == 0) {
068      hRegionInfos = new RegionInfo[]{
069          RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
070           .setStartKey(null)
071           .setEndKey(null)
072           .setSplit(false)
073           .setRegionId(regionId)
074           .build()
075      };
076    } else {
077      int numRegions = splitKeys.length + 1;
078      hRegionInfos = new RegionInfo[numRegions];
079      byte[] startKey = null;
080      byte[] endKey = null;
081      for (int i = 0; i < numRegions; i++) {
082        endKey = (i == splitKeys.length) ? null : splitKeys[i];
083        hRegionInfos[i] =
084            RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
085                .setStartKey(startKey)
086                .setEndKey(endKey)
087                .setSplit(false)
088                .setRegionId(regionId)
089                .build();
090        startKey = endKey;
091      }
092    }
093    return hRegionInfos;
094  }
095
096  /**
097   * Create new set of regions on the specified file-system.
098   * NOTE: that you should add the regions to hbase:meta after this operation.
099   *
100   * @param conf {@link Configuration}
101   * @param rootDir Root directory for HBase instance
102   * @param tableDescriptor description of the table
103   * @param newRegions {@link RegionInfo} that describes the regions to create
104   * @param task {@link RegionFillTask} custom code to populate region after creation
105   * @throws IOException
106   */
107  public static List<RegionInfo> createRegions(final Configuration conf, final Path rootDir,
108      final TableDescriptor tableDescriptor, final RegionInfo[] newRegions,
109      final RegionFillTask task) throws IOException {
110    if (newRegions == null) return null;
111    int regionNumber = newRegions.length;
112    ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
113        "RegionOpenAndInitThread-" + tableDescriptor.getTableName(), regionNumber);
114    try {
115      return createRegions(exec, conf, rootDir, tableDescriptor, newRegions, task);
116    } finally {
117      exec.shutdownNow();
118    }
119  }
120
121  /**
122   * Create new set of regions on the specified file-system.
123   * NOTE: that you should add the regions to hbase:meta after this operation.
124   *
125   * @param exec Thread Pool Executor
126   * @param conf {@link Configuration}
127   * @param rootDir Root directory for HBase instance
128   * @param tableDescriptor description of the table
129   * @param newRegions {@link RegionInfo} that describes the regions to create
130   * @param task {@link RegionFillTask} custom code to populate region after creation
131   * @throws IOException
132   */
133  public static List<RegionInfo> createRegions(final ThreadPoolExecutor exec,
134                                                final Configuration conf, final Path rootDir,
135                                                final TableDescriptor tableDescriptor, final RegionInfo[] newRegions,
136                                                final RegionFillTask task) throws IOException {
137    if (newRegions == null) return null;
138    int regionNumber = newRegions.length;
139    CompletionService<RegionInfo> completionService = new ExecutorCompletionService<>(exec);
140    List<RegionInfo> regionInfos = new ArrayList<>();
141    for (final RegionInfo newRegion : newRegions) {
142      completionService.submit(new Callable<RegionInfo>() {
143        @Override
144        public RegionInfo call() throws IOException {
145          return createRegion(conf, rootDir, tableDescriptor, newRegion, task);
146        }
147      });
148    }
149    try {
150      // wait for all regions to finish creation
151      for (int i = 0; i < regionNumber; i++) {
152        regionInfos.add(completionService.take().get());
153      }
154    } catch (InterruptedException e) {
155      LOG.error("Caught " + e + " during region creation");
156      throw new InterruptedIOException(e.getMessage());
157    } catch (ExecutionException e) {
158      throw new IOException(e);
159    }
160    return regionInfos;
161  }
162
163  /**
164   * Create new set of regions on the specified file-system.
165   * @param conf {@link Configuration}
166   * @param rootDir Root directory for HBase instance
167   * @param tableDescriptor description of the table
168   * @param newRegion {@link RegionInfo} that describes the region to create
169   * @param task {@link RegionFillTask} custom code to populate region after creation
170   * @throws IOException
171   */
172  public static RegionInfo createRegion(final Configuration conf, final Path rootDir,
173      final TableDescriptor tableDescriptor, final RegionInfo newRegion,
174      final RegionFillTask task) throws IOException {
175    // 1. Create HRegion
176    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
177    // unless I pass along via the conf.
178    Configuration confForWAL = new Configuration(conf);
179    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
180    HRegion region = HRegion.createHRegion(newRegion, rootDir, conf, tableDescriptor, null, false);
181    try {
182      // 2. Custom user code to interact with the created region
183      if (task != null) {
184        task.fillRegion(region);
185      }
186    } finally {
187      // 3. Close the new region to flush to disk. Close log file too.
188      region.close();
189    }
190    return region.getRegionInfo();
191  }
192
193  /**
194   * Execute the task on the specified set of regions.
195   *
196   * @param exec Thread Pool Executor
197   * @param regions {@link RegionInfo} that describes the regions to edit
198   * @param task {@link RegionFillTask} custom code to edit the region
199   * @throws IOException
200   */
201  public static void editRegions(final ThreadPoolExecutor exec,
202      final Collection<RegionInfo> regions, final RegionEditTask task) throws IOException {
203    final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(exec);
204    for (final RegionInfo hri: regions) {
205      completionService.submit(new Callable<Void>() {
206        @Override
207        public Void call() throws IOException {
208          task.editRegion(hri);
209          return null;
210        }
211      });
212    }
213
214    try {
215      for (RegionInfo hri: regions) {
216        completionService.take().get();
217      }
218    } catch (InterruptedException e) {
219      throw new InterruptedIOException(e.getMessage());
220    } catch (ExecutionException e) {
221      IOException ex = new IOException();
222      ex.initCause(e.getCause());
223      throw ex;
224    }
225  }
226
227  /*
228   * used by createRegions() to get the thread pool executor based on the
229   * "hbase.hregion.open.and.init.threads.max" property.
230   */
231  static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
232      final String threadNamePrefix, int regionNumber) {
233    int maxThreads = Math.min(regionNumber, conf.getInt(
234        "hbase.hregion.open.and.init.threads.max", 16));
235    ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
236    .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
237        Threads.newDaemonThreadFactory(threadNamePrefix));
238    return regionOpenAndInitThreadPool;
239  }
240}