1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.util;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.CompletionService;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorCompletionService;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.HTableDescriptor;
42 import org.apache.hadoop.hbase.regionserver.HRegion;
43 import org.apache.hadoop.hbase.master.AssignmentManager;
44
45
46
47
48 @InterfaceAudience.Private
49 public abstract class ModifyRegionUtils {
50 private static final Log LOG = LogFactory.getLog(ModifyRegionUtils.class);
51
52 private ModifyRegionUtils() {
53 }
54
55 public interface RegionFillTask {
56 void fillRegion(final HRegion region) throws IOException;
57 }
58
59 public interface RegionEditTask {
60 void editRegion(final HRegionInfo region) throws IOException;
61 }
62
63 public static HRegionInfo[] createHRegionInfos(HTableDescriptor hTableDescriptor,
64 byte[][] splitKeys) {
65 long regionId = System.currentTimeMillis();
66 HRegionInfo[] hRegionInfos = null;
67 if (splitKeys == null || splitKeys.length == 0) {
68 hRegionInfos = new HRegionInfo[]{
69 new HRegionInfo(hTableDescriptor.getTableName(), null, null, false, regionId)
70 };
71 } else {
72 int numRegions = splitKeys.length + 1;
73 hRegionInfos = new HRegionInfo[numRegions];
74 byte[] startKey = null;
75 byte[] endKey = null;
76 for (int i = 0; i < numRegions; i++) {
77 endKey = (i == splitKeys.length) ? null : splitKeys[i];
78 hRegionInfos[i] =
79 new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey,
80 false, regionId);
81 startKey = endKey;
82 }
83 }
84 return hRegionInfos;
85 }
86
87
88
89
90
91
92
93
94
95
96
97 public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
98 final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions) throws IOException {
99 return createRegions(conf, rootDir, hTableDescriptor, newRegions, null);
100 }
101
102
103
104
105
106
107
108
109
110
111
112
113 public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
114 final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
115 final RegionFillTask task) throws IOException {
116
117 Path tableDir = FSUtils.getTableDir(rootDir, hTableDescriptor.getTableName());
118 return createRegions(conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
119 }
120
121
122
123
124
125
126
127
128
129
130
131
132
133 public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
134 final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
135 final RegionFillTask task) throws IOException {
136 if (newRegions == null) return null;
137 int regionNumber = newRegions.length;
138 ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
139 "RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber);
140 try {
141 return createRegions(exec, conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
142 } finally {
143 exec.shutdownNow();
144 }
145 }
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160 public static List<HRegionInfo> createRegions(final ThreadPoolExecutor exec,
161 final Configuration conf, final Path rootDir, final Path tableDir,
162 final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
163 final RegionFillTask task) throws IOException {
164 if (newRegions == null) return null;
165 int regionNumber = newRegions.length;
166 CompletionService<HRegionInfo> completionService =
167 new ExecutorCompletionService<HRegionInfo>(exec);
168 List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
169 for (final HRegionInfo newRegion : newRegions) {
170 completionService.submit(new Callable<HRegionInfo>() {
171 @Override
172 public HRegionInfo call() throws IOException {
173 return createRegion(conf, rootDir, tableDir, hTableDescriptor, newRegion, task);
174 }
175 });
176 }
177 try {
178
179 for (int i = 0; i < regionNumber; i++) {
180 regionInfos.add(completionService.take().get());
181 }
182 } catch (InterruptedException e) {
183 LOG.error("Caught " + e + " during region creation");
184 throw new InterruptedIOException(e.getMessage());
185 } catch (ExecutionException e) {
186 throw new IOException(e);
187 }
188 return regionInfos;
189 }
190
191
192
193
194
195
196
197
198
199
200
201 public static HRegionInfo createRegion(final Configuration conf, final Path rootDir,
202 final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion,
203 final RegionFillTask task) throws IOException {
204
205 HRegion region = HRegion.createHRegion(newRegion,
206 rootDir, tableDir, conf, hTableDescriptor, null,
207 false, true);
208 try {
209
210 if (task != null) {
211 task.fillRegion(region);
212 }
213 } finally {
214
215 region.close();
216 }
217 return region.getRegionInfo();
218 }
219
220
221
222
223
224
225
226
227
228 public static void editRegions(final ThreadPoolExecutor exec,
229 final Collection<HRegionInfo> regions, final RegionEditTask task) throws IOException {
230 final ExecutorCompletionService<Void> completionService =
231 new ExecutorCompletionService<Void>(exec);
232 for (final HRegionInfo hri: regions) {
233 completionService.submit(new Callable<Void>() {
234 @Override
235 public Void call() throws IOException {
236 task.editRegion(hri);
237 return null;
238 }
239 });
240 }
241
242 try {
243 for (HRegionInfo hri: regions) {
244 completionService.take().get();
245 }
246 } catch (InterruptedException e) {
247 throw new InterruptedIOException(e.getMessage());
248 } catch (ExecutionException e) {
249 IOException ex = new IOException();
250 ex.initCause(e.getCause());
251 throw ex;
252 }
253 }
254
255
256
257
258
259 static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
260 final String threadNamePrefix, int regionNumber) {
261 int maxThreads = Math.min(regionNumber, conf.getInt(
262 "hbase.hregion.open.and.init.threads.max", 10));
263 ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
264 .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
265 new ThreadFactory() {
266 private int count = 1;
267
268 @Override
269 public Thread newThread(Runnable r) {
270 return new Thread(r, threadNamePrefix + "-" + count++);
271 }
272 });
273 return regionOpenAndInitThreadPool;
274 }
275
276
277
278
279
280
281
282
283 public static void assignRegions(final AssignmentManager assignmentManager,
284 final List<HRegionInfo> regionInfos) throws IOException {
285 try {
286 assignmentManager.getRegionStates().createRegionStates(regionInfos);
287 assignmentManager.assign(regionInfos);
288 } catch (InterruptedException e) {
289 LOG.error("Caught " + e + " during round-robin assignment");
290 InterruptedIOException ie = new InterruptedIOException(e.getMessage());
291 ie.initCause(e);
292 throw ie;
293 }
294 }
295 }