001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.util; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.List; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CompletionService; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorCompletionService; 031import java.util.concurrent.ThreadFactory; 032import java.util.concurrent.ThreadPoolExecutor; 033import java.util.concurrent.TimeUnit; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.RegionInfoBuilder; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.regionserver.HRegion; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Utility methods for interacting with the regions. 048 */ 049@InterfaceAudience.Private 050public abstract class ModifyRegionUtils { 051 private static final Logger LOG = LoggerFactory.getLogger(ModifyRegionUtils.class); 052 053 private ModifyRegionUtils() { 054 } 055 056 public interface RegionFillTask { 057 void fillRegion(final HRegion region) throws IOException; 058 } 059 060 public interface RegionEditTask { 061 void editRegion(final RegionInfo region) throws IOException; 062 } 063 064 public static RegionInfo[] createRegionInfos(TableDescriptor tableDescriptor, 065 byte[][] splitKeys) { 066 long regionId = System.currentTimeMillis(); 067 RegionInfo[] hRegionInfos = null; 068 if (splitKeys == null || splitKeys.length == 0) { 069 hRegionInfos = new RegionInfo[]{ 070 RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()) 071 .setStartKey(null) 072 .setEndKey(null) 073 .setSplit(false) 074 .setRegionId(regionId) 075 .build() 076 }; 077 } else { 078 int numRegions = splitKeys.length + 1; 079 hRegionInfos = new RegionInfo[numRegions]; 080 byte[] startKey = null; 081 byte[] endKey = null; 082 for (int i = 0; i < numRegions; i++) { 083 endKey = (i == splitKeys.length) ? null : splitKeys[i]; 084 hRegionInfos[i] = 085 RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()) 086 .setStartKey(startKey) 087 .setEndKey(endKey) 088 .setSplit(false) 089 .setRegionId(regionId) 090 .build(); 091 startKey = endKey; 092 } 093 } 094 return hRegionInfos; 095 } 096 097 /** 098 * Create new set of regions on the specified file-system. 099 * NOTE: that you should add the regions to hbase:meta after this operation. 100 * 101 * @param conf {@link Configuration} 102 * @param rootDir Root directory for HBase instance 103 * @param tableDescriptor description of the table 104 * @param newRegions {@link RegionInfo} that describes the regions to create 105 * @param task {@link RegionFillTask} custom code to populate region after creation 106 * @throws IOException 107 */ 108 public static List<RegionInfo> createRegions(final Configuration conf, final Path rootDir, 109 final TableDescriptor tableDescriptor, final RegionInfo[] newRegions, 110 final RegionFillTask task) throws IOException { 111 if (newRegions == null) return null; 112 int regionNumber = newRegions.length; 113 ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf, 114 "RegionOpenAndInitThread-" + tableDescriptor.getTableName(), regionNumber); 115 try { 116 return createRegions(exec, conf, rootDir, tableDescriptor, newRegions, task); 117 } finally { 118 exec.shutdownNow(); 119 } 120 } 121 122 /** 123 * Create new set of regions on the specified file-system. 124 * NOTE: that you should add the regions to hbase:meta after this operation. 125 * 126 * @param exec Thread Pool Executor 127 * @param conf {@link Configuration} 128 * @param rootDir Root directory for HBase instance 129 * @param tableDescriptor description of the table 130 * @param newRegions {@link RegionInfo} that describes the regions to create 131 * @param task {@link RegionFillTask} custom code to populate region after creation 132 * @throws IOException 133 */ 134 public static List<RegionInfo> createRegions(final ThreadPoolExecutor exec, 135 final Configuration conf, final Path rootDir, 136 final TableDescriptor tableDescriptor, final RegionInfo[] newRegions, 137 final RegionFillTask task) throws IOException { 138 if (newRegions == null) return null; 139 int regionNumber = newRegions.length; 140 CompletionService<RegionInfo> completionService = new ExecutorCompletionService<>(exec); 141 List<RegionInfo> regionInfos = new ArrayList<>(); 142 for (final RegionInfo newRegion : newRegions) { 143 completionService.submit(new Callable<RegionInfo>() { 144 @Override 145 public RegionInfo call() throws IOException { 146 return createRegion(conf, rootDir, tableDescriptor, newRegion, task); 147 } 148 }); 149 } 150 try { 151 // wait for all regions to finish creation 152 for (int i = 0; i < regionNumber; i++) { 153 regionInfos.add(completionService.take().get()); 154 } 155 } catch (InterruptedException e) { 156 LOG.error("Caught " + e + " during region creation"); 157 throw new InterruptedIOException(e.getMessage()); 158 } catch (ExecutionException e) { 159 throw new IOException(e); 160 } 161 return regionInfos; 162 } 163 164 /** 165 * Create new set of regions on the specified file-system. 166 * @param conf {@link Configuration} 167 * @param rootDir Root directory for HBase instance 168 * @param tableDescriptor description of the table 169 * @param newRegion {@link RegionInfo} that describes the region to create 170 * @param task {@link RegionFillTask} custom code to populate region after creation 171 * @throws IOException 172 */ 173 public static RegionInfo createRegion(final Configuration conf, final Path rootDir, 174 final TableDescriptor tableDescriptor, final RegionInfo newRegion, 175 final RegionFillTask task) throws IOException { 176 // 1. Create HRegion 177 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 178 // unless I pass along via the conf. 179 Configuration confForWAL = new Configuration(conf); 180 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 181 HRegion region = HRegion.createHRegion(newRegion, rootDir, conf, tableDescriptor, null, false); 182 try { 183 // 2. Custom user code to interact with the created region 184 if (task != null) { 185 task.fillRegion(region); 186 } 187 } finally { 188 // 3. Close the new region to flush to disk. Close log file too. 189 region.close(); 190 } 191 return region.getRegionInfo(); 192 } 193 194 /** 195 * Execute the task on the specified set of regions. 196 * 197 * @param exec Thread Pool Executor 198 * @param regions {@link RegionInfo} that describes the regions to edit 199 * @param task {@link RegionFillTask} custom code to edit the region 200 * @throws IOException 201 */ 202 public static void editRegions(final ThreadPoolExecutor exec, 203 final Collection<RegionInfo> regions, final RegionEditTask task) throws IOException { 204 final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(exec); 205 for (final RegionInfo hri: regions) { 206 completionService.submit(new Callable<Void>() { 207 @Override 208 public Void call() throws IOException { 209 task.editRegion(hri); 210 return null; 211 } 212 }); 213 } 214 215 try { 216 for (RegionInfo hri: regions) { 217 completionService.take().get(); 218 } 219 } catch (InterruptedException e) { 220 throw new InterruptedIOException(e.getMessage()); 221 } catch (ExecutionException e) { 222 IOException ex = new IOException(); 223 ex.initCause(e.getCause()); 224 throw ex; 225 } 226 } 227 228 /* 229 * used by createRegions() to get the thread pool executor based on the 230 * "hbase.hregion.open.and.init.threads.max" property. 231 */ 232 static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf, 233 final String threadNamePrefix, int regionNumber) { 234 int maxThreads = Math.min(regionNumber, conf.getInt( 235 "hbase.hregion.open.and.init.threads.max", 16)); 236 ThreadPoolExecutor regionOpenAndInitThreadPool = Threads 237 .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, 238 new ThreadFactory() { 239 private int count = 1; 240 241 @Override 242 public Thread newThread(Runnable r) { 243 return new Thread(r, threadNamePrefix + "-" + count++); 244 } 245 }); 246 return regionOpenAndInitThreadPool; 247 } 248}