001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.util;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.List;
027import java.util.concurrent.Callable;
028import java.util.concurrent.CompletionService;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorCompletionService;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionInfoBuilder;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.regionserver.HRegion;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Utility methods for interacting with the regions.
048 */
049@InterfaceAudience.Private
050public abstract class ModifyRegionUtils {
051  private static final Logger LOG = LoggerFactory.getLogger(ModifyRegionUtils.class);
052
053  private ModifyRegionUtils() {
054  }
055
056  public interface RegionFillTask {
057    void fillRegion(final HRegion region) throws IOException;
058  }
059
060  public interface RegionEditTask {
061    void editRegion(final RegionInfo region) throws IOException;
062  }
063
064  public static RegionInfo[] createRegionInfos(TableDescriptor tableDescriptor,
065      byte[][] splitKeys) {
066    long regionId = System.currentTimeMillis();
067    RegionInfo[] hRegionInfos = null;
068    if (splitKeys == null || splitKeys.length == 0) {
069      hRegionInfos = new RegionInfo[]{
070          RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
071           .setStartKey(null)
072           .setEndKey(null)
073           .setSplit(false)
074           .setRegionId(regionId)
075           .build()
076      };
077    } else {
078      int numRegions = splitKeys.length + 1;
079      hRegionInfos = new RegionInfo[numRegions];
080      byte[] startKey = null;
081      byte[] endKey = null;
082      for (int i = 0; i < numRegions; i++) {
083        endKey = (i == splitKeys.length) ? null : splitKeys[i];
084        hRegionInfos[i] =
085            RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
086                .setStartKey(startKey)
087                .setEndKey(endKey)
088                .setSplit(false)
089                .setRegionId(regionId)
090                .build();
091        startKey = endKey;
092      }
093    }
094    return hRegionInfos;
095  }
096
097  /**
098   * Create new set of regions on the specified file-system.
099   * NOTE: that you should add the regions to hbase:meta after this operation.
100   *
101   * @param conf {@link Configuration}
102   * @param rootDir Root directory for HBase instance
103   * @param tableDescriptor description of the table
104   * @param newRegions {@link RegionInfo} that describes the regions to create
105   * @param task {@link RegionFillTask} custom code to populate region after creation
106   * @throws IOException
107   */
108  public static List<RegionInfo> createRegions(final Configuration conf, final Path rootDir,
109      final TableDescriptor tableDescriptor, final RegionInfo[] newRegions,
110      final RegionFillTask task) throws IOException {
111    if (newRegions == null) return null;
112    int regionNumber = newRegions.length;
113    ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
114        "RegionOpenAndInitThread-" + tableDescriptor.getTableName(), regionNumber);
115    try {
116      return createRegions(exec, conf, rootDir, tableDescriptor, newRegions, task);
117    } finally {
118      exec.shutdownNow();
119    }
120  }
121
122  /**
123   * Create new set of regions on the specified file-system.
124   * NOTE: that you should add the regions to hbase:meta after this operation.
125   *
126   * @param exec Thread Pool Executor
127   * @param conf {@link Configuration}
128   * @param rootDir Root directory for HBase instance
129   * @param tableDescriptor description of the table
130   * @param newRegions {@link RegionInfo} that describes the regions to create
131   * @param task {@link RegionFillTask} custom code to populate region after creation
132   * @throws IOException
133   */
134  public static List<RegionInfo> createRegions(final ThreadPoolExecutor exec,
135                                                final Configuration conf, final Path rootDir,
136                                                final TableDescriptor tableDescriptor, final RegionInfo[] newRegions,
137                                                final RegionFillTask task) throws IOException {
138    if (newRegions == null) return null;
139    int regionNumber = newRegions.length;
140    CompletionService<RegionInfo> completionService = new ExecutorCompletionService<>(exec);
141    List<RegionInfo> regionInfos = new ArrayList<>();
142    for (final RegionInfo newRegion : newRegions) {
143      completionService.submit(new Callable<RegionInfo>() {
144        @Override
145        public RegionInfo call() throws IOException {
146          return createRegion(conf, rootDir, tableDescriptor, newRegion, task);
147        }
148      });
149    }
150    try {
151      // wait for all regions to finish creation
152      for (int i = 0; i < regionNumber; i++) {
153        regionInfos.add(completionService.take().get());
154      }
155    } catch (InterruptedException e) {
156      LOG.error("Caught " + e + " during region creation");
157      throw new InterruptedIOException(e.getMessage());
158    } catch (ExecutionException e) {
159      throw new IOException(e);
160    }
161    return regionInfos;
162  }
163
164  /**
165   * Create new set of regions on the specified file-system.
166   * @param conf {@link Configuration}
167   * @param rootDir Root directory for HBase instance
168   * @param tableDescriptor description of the table
169   * @param newRegion {@link RegionInfo} that describes the region to create
170   * @param task {@link RegionFillTask} custom code to populate region after creation
171   * @throws IOException
172   */
173  public static RegionInfo createRegion(final Configuration conf, final Path rootDir,
174      final TableDescriptor tableDescriptor, final RegionInfo newRegion,
175      final RegionFillTask task) throws IOException {
176    // 1. Create HRegion
177    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
178    // unless I pass along via the conf.
179    Configuration confForWAL = new Configuration(conf);
180    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
181    HRegion region = HRegion.createHRegion(newRegion, rootDir, conf, tableDescriptor, null, false);
182    try {
183      // 2. Custom user code to interact with the created region
184      if (task != null) {
185        task.fillRegion(region);
186      }
187    } finally {
188      // 3. Close the new region to flush to disk. Close log file too.
189      region.close();
190    }
191    return region.getRegionInfo();
192  }
193
194  /**
195   * Execute the task on the specified set of regions.
196   *
197   * @param exec Thread Pool Executor
198   * @param regions {@link RegionInfo} that describes the regions to edit
199   * @param task {@link RegionFillTask} custom code to edit the region
200   * @throws IOException
201   */
202  public static void editRegions(final ThreadPoolExecutor exec,
203      final Collection<RegionInfo> regions, final RegionEditTask task) throws IOException {
204    final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(exec);
205    for (final RegionInfo hri: regions) {
206      completionService.submit(new Callable<Void>() {
207        @Override
208        public Void call() throws IOException {
209          task.editRegion(hri);
210          return null;
211        }
212      });
213    }
214
215    try {
216      for (RegionInfo hri: regions) {
217        completionService.take().get();
218      }
219    } catch (InterruptedException e) {
220      throw new InterruptedIOException(e.getMessage());
221    } catch (ExecutionException e) {
222      IOException ex = new IOException();
223      ex.initCause(e.getCause());
224      throw ex;
225    }
226  }
227
228  /*
229   * used by createRegions() to get the thread pool executor based on the
230   * "hbase.hregion.open.and.init.threads.max" property.
231   */
232  static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
233      final String threadNamePrefix, int regionNumber) {
234    int maxThreads = Math.min(regionNumber, conf.getInt(
235        "hbase.hregion.open.and.init.threads.max", 16));
236    ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
237    .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
238        new ThreadFactory() {
239          private int count = 1;
240
241          @Override
242          public Thread newThread(Runnable r) {
243            return new Thread(r, threadNamePrefix + "-" + count++);
244          }
245        });
246    return regionOpenAndInitThreadPool;
247  }
248}