001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.util; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.List; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CompletionService; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorCompletionService; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.client.RegionInfo; 038import org.apache.hadoop.hbase.client.RegionInfoBuilder; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.regionserver.HRegion; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * Utility methods for interacting with the regions. 047 */ 048@InterfaceAudience.Private 049public abstract class ModifyRegionUtils { 050 private static final Logger LOG = LoggerFactory.getLogger(ModifyRegionUtils.class); 051 052 private ModifyRegionUtils() { 053 } 054 055 public interface RegionFillTask { 056 void fillRegion(final HRegion region) throws IOException; 057 } 058 059 public interface RegionEditTask { 060 void editRegion(final RegionInfo region) throws IOException; 061 } 062 063 public static RegionInfo[] createRegionInfos(TableDescriptor tableDescriptor, 064 byte[][] splitKeys) { 065 long regionId = System.currentTimeMillis(); 066 RegionInfo[] hRegionInfos = null; 067 if (splitKeys == null || splitKeys.length == 0) { 068 hRegionInfos = new RegionInfo[]{ 069 RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()) 070 .setStartKey(null) 071 .setEndKey(null) 072 .setSplit(false) 073 .setRegionId(regionId) 074 .build() 075 }; 076 } else { 077 int numRegions = splitKeys.length + 1; 078 hRegionInfos = new RegionInfo[numRegions]; 079 byte[] startKey = null; 080 byte[] endKey = null; 081 for (int i = 0; i < numRegions; i++) { 082 endKey = (i == splitKeys.length) ? null : splitKeys[i]; 083 hRegionInfos[i] = 084 RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()) 085 .setStartKey(startKey) 086 .setEndKey(endKey) 087 .setSplit(false) 088 .setRegionId(regionId) 089 .build(); 090 startKey = endKey; 091 } 092 } 093 return hRegionInfos; 094 } 095 096 /** 097 * Create new set of regions on the specified file-system. 098 * NOTE: that you should add the regions to hbase:meta after this operation. 099 * 100 * @param conf {@link Configuration} 101 * @param rootDir Root directory for HBase instance 102 * @param tableDescriptor description of the table 103 * @param newRegions {@link RegionInfo} that describes the regions to create 104 * @param task {@link RegionFillTask} custom code to populate region after creation 105 * @throws IOException 106 */ 107 public static List<RegionInfo> createRegions(final Configuration conf, final Path rootDir, 108 final TableDescriptor tableDescriptor, final RegionInfo[] newRegions, 109 final RegionFillTask task) throws IOException { 110 if (newRegions == null) return null; 111 int regionNumber = newRegions.length; 112 ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf, 113 "RegionOpenAndInit-" + tableDescriptor.getTableName(), regionNumber); 114 try { 115 return createRegions(exec, conf, rootDir, tableDescriptor, newRegions, task); 116 } finally { 117 exec.shutdownNow(); 118 } 119 } 120 121 /** 122 * Create new set of regions on the specified file-system. 123 * NOTE: that you should add the regions to hbase:meta after this operation. 124 * 125 * @param exec Thread Pool Executor 126 * @param conf {@link Configuration} 127 * @param rootDir Root directory for HBase instance 128 * @param tableDescriptor description of the table 129 * @param newRegions {@link RegionInfo} that describes the regions to create 130 * @param task {@link RegionFillTask} custom code to populate region after creation 131 * @throws IOException 132 */ 133 public static List<RegionInfo> createRegions(final ThreadPoolExecutor exec, 134 final Configuration conf, final Path rootDir, 135 final TableDescriptor tableDescriptor, final RegionInfo[] newRegions, 136 final RegionFillTask task) throws IOException { 137 if (newRegions == null) return null; 138 int regionNumber = newRegions.length; 139 CompletionService<RegionInfo> completionService = new ExecutorCompletionService<>(exec); 140 List<RegionInfo> regionInfos = new ArrayList<>(); 141 for (final RegionInfo newRegion : newRegions) { 142 completionService.submit(new Callable<RegionInfo>() { 143 @Override 144 public RegionInfo call() throws IOException { 145 return createRegion(conf, rootDir, tableDescriptor, newRegion, task); 146 } 147 }); 148 } 149 try { 150 // wait for all regions to finish creation 151 for (int i = 0; i < regionNumber; i++) { 152 regionInfos.add(completionService.take().get()); 153 } 154 } catch (InterruptedException e) { 155 LOG.error("Caught " + e + " during region creation"); 156 throw new InterruptedIOException(e.getMessage()); 157 } catch (ExecutionException e) { 158 throw new IOException(e); 159 } 160 return regionInfos; 161 } 162 163 /** 164 * Create new set of regions on the specified file-system. 165 * @param conf {@link Configuration} 166 * @param rootDir Root directory for HBase instance 167 * @param tableDescriptor description of the table 168 * @param newRegion {@link RegionInfo} that describes the region to create 169 * @param task {@link RegionFillTask} custom code to populate region after creation 170 * @throws IOException 171 */ 172 public static RegionInfo createRegion(final Configuration conf, final Path rootDir, 173 final TableDescriptor tableDescriptor, final RegionInfo newRegion, 174 final RegionFillTask task) throws IOException { 175 // 1. Create HRegion 176 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 177 // unless I pass along via the conf. 178 Configuration confForWAL = new Configuration(conf); 179 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 180 HRegion region = HRegion.createHRegion(newRegion, rootDir, conf, tableDescriptor, null, false); 181 try { 182 // 2. Custom user code to interact with the created region 183 if (task != null) { 184 task.fillRegion(region); 185 } 186 } finally { 187 // 3. Close the new region to flush to disk. Close log file too. 188 region.close(); 189 } 190 return region.getRegionInfo(); 191 } 192 193 /** 194 * Execute the task on the specified set of regions. 195 * 196 * @param exec Thread Pool Executor 197 * @param regions {@link RegionInfo} that describes the regions to edit 198 * @param task {@link RegionFillTask} custom code to edit the region 199 * @throws IOException 200 */ 201 public static void editRegions(final ThreadPoolExecutor exec, 202 final Collection<RegionInfo> regions, final RegionEditTask task) throws IOException { 203 final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(exec); 204 for (final RegionInfo hri: regions) { 205 completionService.submit(new Callable<Void>() { 206 @Override 207 public Void call() throws IOException { 208 task.editRegion(hri); 209 return null; 210 } 211 }); 212 } 213 214 try { 215 for (RegionInfo hri: regions) { 216 completionService.take().get(); 217 } 218 } catch (InterruptedException e) { 219 throw new InterruptedIOException(e.getMessage()); 220 } catch (ExecutionException e) { 221 throw new IOException(e.getCause()); 222 } 223 } 224 225 /* 226 * used by createRegions() to get the thread pool executor based on the 227 * "hbase.hregion.open.and.init.threads.max" property. 228 */ 229 static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf, 230 final String threadNamePrefix, int regionNumber) { 231 int maxThreads = Math.min(regionNumber, conf.getInt( 232 "hbase.hregion.open.and.init.threads.max", 16)); 233 ThreadPoolExecutor regionOpenAndInitThreadPool = Threads. 234 getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, 235 Threads.newDaemonThreadFactory(threadNamePrefix)); 236 return regionOpenAndInitThreadPool; 237 } 238}