001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.List; 025import java.util.concurrent.Callable; 026import java.util.concurrent.CompletionService; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.ExecutorCompletionService; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionInfoBuilder; 036import org.apache.hadoop.hbase.client.TableDescriptor; 037import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 044 045/** 046 * Utility methods for interacting with the regions. 047 */ 048@InterfaceAudience.Private 049public abstract class ModifyRegionUtils { 050 private static final Logger LOG = LoggerFactory.getLogger(ModifyRegionUtils.class); 051 052 private ModifyRegionUtils() { 053 } 054 055 public interface RegionFillTask { 056 void fillRegion(final HRegion region) throws IOException; 057 } 058 059 public interface RegionEditTask { 060 void editRegion(final RegionInfo region) throws IOException; 061 } 062 063 public static RegionInfo[] createRegionInfos(TableDescriptor tableDescriptor, 064 byte[][] splitKeys) { 065 long regionId = EnvironmentEdgeManager.currentTime(); 066 RegionInfo[] hRegionInfos = null; 067 if (splitKeys == null || splitKeys.length == 0) { 068 hRegionInfos = new RegionInfo[] { RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()) 069 .setStartKey(null).setEndKey(null).setSplit(false).setRegionId(regionId).build() }; 070 } else { 071 int numRegions = splitKeys.length + 1; 072 hRegionInfos = new RegionInfo[numRegions]; 073 byte[] startKey = null; 074 byte[] endKey = null; 075 for (int i = 0; i < numRegions; i++) { 076 endKey = (i == splitKeys.length) ? null : splitKeys[i]; 077 hRegionInfos[i] = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()) 078 .setStartKey(startKey).setEndKey(endKey).setSplit(false).setRegionId(regionId).build(); 079 startKey = endKey; 080 } 081 } 082 return hRegionInfos; 083 } 084 085 /** 086 * Create new set of regions on the specified file-system. NOTE: that you should add the regions 087 * to hbase:meta after this operation. 088 * @param env {@link MasterProcedureEnv} 089 * @param rootDir Root directory for HBase instance 090 * @param tableDescriptor description of the table 091 * @param newRegions {@link RegionInfo} that describes the regions to create 092 * @param task {@link RegionFillTask} custom code to populate region after creation 093 */ 094 public static List<RegionInfo> createRegions(final MasterProcedureEnv env, final Path rootDir, 095 final TableDescriptor tableDescriptor, final RegionInfo[] newRegions, final RegionFillTask task) 096 throws IOException { 097 if (newRegions == null) return null; 098 int regionNumber = newRegions.length; 099 ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(env.getMasterConfiguration(), 100 "RegionOpenAndInit-" + tableDescriptor.getTableName(), regionNumber); 101 try { 102 return createRegions(exec, env.getMasterConfiguration(), env, rootDir, tableDescriptor, 103 newRegions, task); 104 } finally { 105 exec.shutdownNow(); 106 } 107 } 108 109 public static List<RegionInfo> createRegions(final ThreadPoolExecutor exec, 110 final Configuration conf, final Path rootDir, final TableDescriptor tableDescriptor, 111 final RegionInfo[] newRegions, final RegionFillTask task) throws IOException { 112 return createRegions(exec, conf, null, rootDir, tableDescriptor, newRegions, task); 113 } 114 115 /** 116 * Create new set of regions on the specified file-system. NOTE: that you should add the regions 117 * to hbase:meta after this operation. 118 * @param exec Thread Pool Executor 119 * @param conf {@link Configuration} 120 * @param rootDir Root directory for HBase instance 121 * @param tableDescriptor description of the table 122 * @param newRegions {@link RegionInfo} that describes the regions to create 123 * @param task {@link RegionFillTask} custom code to populate region after creation 124 */ 125 public static List<RegionInfo> createRegions(final ThreadPoolExecutor exec, 126 final Configuration conf, final MasterProcedureEnv env, final Path rootDir, 127 final TableDescriptor tableDescriptor, final RegionInfo[] newRegions, final RegionFillTask task) 128 throws IOException { 129 if (newRegions == null) return null; 130 int regionNumber = newRegions.length; 131 CompletionService<RegionInfo> completionService = new ExecutorCompletionService<>(exec); 132 List<RegionInfo> regionInfos = new ArrayList<>(); 133 for (final RegionInfo newRegion : newRegions) { 134 completionService.submit(new Callable<RegionInfo>() { 135 @Override 136 public RegionInfo call() throws IOException { 137 return createRegion(conf, env, rootDir, tableDescriptor, newRegion, task); 138 } 139 }); 140 } 141 try { 142 // wait for all regions to finish creation 143 for (int i = 0; i < regionNumber; i++) { 144 regionInfos.add(completionService.take().get()); 145 } 146 } catch (InterruptedException e) { 147 LOG.error("Caught " + e + " during region creation"); 148 throw new InterruptedIOException(e.getMessage()); 149 } catch (ExecutionException e) { 150 throw new IOException(e); 151 } 152 return regionInfos; 153 } 154 155 /** 156 * Create new set of regions on the specified file-system. 157 * @param conf {@link Configuration} 158 * @param rootDir Root directory for HBase instance 159 * @param tableDescriptor description of the table 160 * @param newRegion {@link RegionInfo} that describes the region to create 161 * @param task {@link RegionFillTask} custom code to populate region after creation 162 */ 163 public static RegionInfo createRegion(final Configuration conf, final MasterProcedureEnv env, 164 final Path rootDir, final TableDescriptor tableDescriptor, final RegionInfo newRegion, 165 final RegionFillTask task) throws IOException { 166 // 1. Create HRegion 167 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 168 // unless I pass along via the conf. 169 Configuration confForWAL = new Configuration(conf); 170 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 171 HRegion region = HRegion.createHRegion(newRegion, rootDir, conf, tableDescriptor, null, false, 172 null, env == null ? null : env.getMasterServices()); 173 try { 174 // 2. Custom user code to interact with the created region 175 if (task != null) { 176 task.fillRegion(region); 177 } 178 } finally { 179 // 3. Close the new region to flush to disk. Close log file too. 180 region.close(false, true); 181 } 182 return region.getRegionInfo(); 183 } 184 185 /** 186 * Execute the task on the specified set of regions. 187 * @param exec Thread Pool Executor 188 * @param regions {@link RegionInfo} that describes the regions to edit 189 * @param task {@link RegionFillTask} custom code to edit the region 190 */ 191 public static void editRegions(final ThreadPoolExecutor exec, 192 final Collection<RegionInfo> regions, final RegionEditTask task) throws IOException { 193 final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(exec); 194 for (final RegionInfo hri : regions) { 195 completionService.submit(new Callable<Void>() { 196 @Override 197 public Void call() throws IOException { 198 task.editRegion(hri); 199 return null; 200 } 201 }); 202 } 203 204 try { 205 for (RegionInfo hri : regions) { 206 completionService.take().get(); 207 } 208 } catch (InterruptedException e) { 209 throw new InterruptedIOException(e.getMessage()); 210 } catch (ExecutionException e) { 211 throw new IOException(e.getCause()); 212 } 213 } 214 215 /* 216 * used by createRegions() to get the thread pool executor based on the 217 * "hbase.hregion.open.and.init.threads.max" property. 218 */ 219 static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf, 220 final String threadNamePrefix, int regionNumber) { 221 int maxThreads = 222 Math.min(regionNumber, conf.getInt("hbase.hregion.open.and.init.threads.max", 16)); 223 ThreadPoolExecutor regionOpenAndInitThreadPool = Threads.getBoundedCachedThreadPool(maxThreads, 224 30L, TimeUnit.SECONDS, new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-pool-%d") 225 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 226 return regionOpenAndInitThreadPool; 227 } 228}