001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.concurrent.Callable;
026import java.util.concurrent.CompletionService;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.ExecutorCompletionService;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionInfoBuilder;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
038import org.apache.hadoop.hbase.regionserver.HRegion;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
044
045/**
046 * Utility methods for interacting with the regions.
047 */
048@InterfaceAudience.Private
049public abstract class ModifyRegionUtils {
050  private static final Logger LOG = LoggerFactory.getLogger(ModifyRegionUtils.class);
051
052  private ModifyRegionUtils() {
053  }
054
055  public interface RegionFillTask {
056    void fillRegion(final HRegion region) throws IOException;
057  }
058
059  public interface RegionEditTask {
060    void editRegion(final RegionInfo region) throws IOException;
061  }
062
063  public static RegionInfo[] createRegionInfos(TableDescriptor tableDescriptor,
064    byte[][] splitKeys) {
065    long regionId = EnvironmentEdgeManager.currentTime();
066    RegionInfo[] hRegionInfos = null;
067    if (splitKeys == null || splitKeys.length == 0) {
068      hRegionInfos = new RegionInfo[] { RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
069        .setStartKey(null).setEndKey(null).setSplit(false).setRegionId(regionId).build() };
070    } else {
071      int numRegions = splitKeys.length + 1;
072      hRegionInfos = new RegionInfo[numRegions];
073      byte[] startKey = null;
074      byte[] endKey = null;
075      for (int i = 0; i < numRegions; i++) {
076        endKey = (i == splitKeys.length) ? null : splitKeys[i];
077        hRegionInfos[i] = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
078          .setStartKey(startKey).setEndKey(endKey).setSplit(false).setRegionId(regionId).build();
079        startKey = endKey;
080      }
081    }
082    return hRegionInfos;
083  }
084
085  /**
086   * Create new set of regions on the specified file-system. NOTE: that you should add the regions
087   * to hbase:meta after this operation.
088   * @param env             {@link MasterProcedureEnv}
089   * @param rootDir         Root directory for HBase instance
090   * @param tableDescriptor description of the table
091   * @param newRegions      {@link RegionInfo} that describes the regions to create
092   * @param task            {@link RegionFillTask} custom code to populate region after creation
093   */
094  public static List<RegionInfo> createRegions(final MasterProcedureEnv env, final Path rootDir,
095    final TableDescriptor tableDescriptor, final RegionInfo[] newRegions, final RegionFillTask task)
096    throws IOException {
097    if (newRegions == null) return null;
098    int regionNumber = newRegions.length;
099    ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(env.getMasterConfiguration(),
100      "RegionOpenAndInit-" + tableDescriptor.getTableName(), regionNumber);
101    try {
102      return createRegions(exec, env.getMasterConfiguration(), env, rootDir, tableDescriptor,
103        newRegions, task);
104    } finally {
105      exec.shutdownNow();
106    }
107  }
108
109  public static List<RegionInfo> createRegions(final ThreadPoolExecutor exec,
110    final Configuration conf, final Path rootDir, final TableDescriptor tableDescriptor,
111    final RegionInfo[] newRegions, final RegionFillTask task) throws IOException {
112    return createRegions(exec, conf, null, rootDir, tableDescriptor, newRegions, task);
113  }
114
115  /**
116   * Create new set of regions on the specified file-system. NOTE: that you should add the regions
117   * to hbase:meta after this operation.
118   * @param exec            Thread Pool Executor
119   * @param conf            {@link Configuration}
120   * @param rootDir         Root directory for HBase instance
121   * @param tableDescriptor description of the table
122   * @param newRegions      {@link RegionInfo} that describes the regions to create
123   * @param task            {@link RegionFillTask} custom code to populate region after creation
124   */
125  public static List<RegionInfo> createRegions(final ThreadPoolExecutor exec,
126    final Configuration conf, final MasterProcedureEnv env, final Path rootDir,
127    final TableDescriptor tableDescriptor, final RegionInfo[] newRegions, final RegionFillTask task)
128    throws IOException {
129    if (newRegions == null) return null;
130    int regionNumber = newRegions.length;
131    CompletionService<RegionInfo> completionService = new ExecutorCompletionService<>(exec);
132    List<RegionInfo> regionInfos = new ArrayList<>();
133    for (final RegionInfo newRegion : newRegions) {
134      completionService.submit(new Callable<RegionInfo>() {
135        @Override
136        public RegionInfo call() throws IOException {
137          return createRegion(conf, env, rootDir, tableDescriptor, newRegion, task);
138        }
139      });
140    }
141    try {
142      // wait for all regions to finish creation
143      for (int i = 0; i < regionNumber; i++) {
144        regionInfos.add(completionService.take().get());
145      }
146    } catch (InterruptedException e) {
147      LOG.error("Caught " + e + " during region creation");
148      throw new InterruptedIOException(e.getMessage());
149    } catch (ExecutionException e) {
150      throw new IOException(e);
151    }
152    return regionInfos;
153  }
154
155  /**
156   * Create new set of regions on the specified file-system.
157   * @param conf            {@link Configuration}
158   * @param rootDir         Root directory for HBase instance
159   * @param tableDescriptor description of the table
160   * @param newRegion       {@link RegionInfo} that describes the region to create
161   * @param task            {@link RegionFillTask} custom code to populate region after creation
162   */
163  public static RegionInfo createRegion(final Configuration conf, final MasterProcedureEnv env,
164    final Path rootDir, final TableDescriptor tableDescriptor, final RegionInfo newRegion,
165    final RegionFillTask task) throws IOException {
166    // 1. Create HRegion
167    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
168    // unless I pass along via the conf.
169    Configuration confForWAL = new Configuration(conf);
170    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
171    HRegion region = HRegion.createHRegion(newRegion, rootDir, conf, tableDescriptor, null, false,
172      null, env == null ? null : env.getMasterServices());
173    try {
174      // 2. Custom user code to interact with the created region
175      if (task != null) {
176        task.fillRegion(region);
177      }
178    } finally {
179      // 3. Close the new region to flush to disk. Close log file too.
180      region.close(false, true);
181    }
182    return region.getRegionInfo();
183  }
184
185  /**
186   * Execute the task on the specified set of regions.
187   * @param exec    Thread Pool Executor
188   * @param regions {@link RegionInfo} that describes the regions to edit
189   * @param task    {@link RegionFillTask} custom code to edit the region
190   */
191  public static void editRegions(final ThreadPoolExecutor exec,
192    final Collection<RegionInfo> regions, final RegionEditTask task) throws IOException {
193    final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(exec);
194    for (final RegionInfo hri : regions) {
195      completionService.submit(new Callable<Void>() {
196        @Override
197        public Void call() throws IOException {
198          task.editRegion(hri);
199          return null;
200        }
201      });
202    }
203
204    try {
205      for (RegionInfo hri : regions) {
206        completionService.take().get();
207      }
208    } catch (InterruptedException e) {
209      throw new InterruptedIOException(e.getMessage());
210    } catch (ExecutionException e) {
211      throw new IOException(e.getCause());
212    }
213  }
214
215  /*
216   * used by createRegions() to get the thread pool executor based on the
217   * "hbase.hregion.open.and.init.threads.max" property.
218   */
219  static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
220    final String threadNamePrefix, int regionNumber) {
221    int maxThreads =
222      Math.min(regionNumber, conf.getInt("hbase.hregion.open.and.init.threads.max", 16));
223    ThreadPoolExecutor regionOpenAndInitThreadPool = Threads.getBoundedCachedThreadPool(maxThreads,
224      30L, TimeUnit.SECONDS, new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-pool-%d")
225        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
226    return regionOpenAndInitThreadPool;
227  }
228}