1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.handler;
20
21 import java.io.IOException;
22 import java.util.concurrent.atomic.AtomicBoolean;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.HRegionInfo;
28 import org.apache.hadoop.hbase.HTableDescriptor;
29 import org.apache.hadoop.hbase.Server;
30 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
31 import org.apache.hadoop.hbase.executor.EventHandler;
32 import org.apache.hadoop.hbase.executor.EventType;
33 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
34 import org.apache.hadoop.hbase.regionserver.HRegion;
35 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
36 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
37 import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
38 import org.apache.hadoop.hbase.util.CancelableProgressable;
39 import org.apache.hadoop.hbase.util.ConfigUtil;
40
41
42
43
44
45 @InterfaceAudience.Private
46 public class OpenRegionHandler extends EventHandler {
47 private static final Log LOG = LogFactory.getLog(OpenRegionHandler.class);
48
49 protected final RegionServerServices rsServices;
50
51 private final HRegionInfo regionInfo;
52 private final HTableDescriptor htd;
53 private final long masterSystemTime;
54
55 private OpenRegionCoordination coordination;
56 private OpenRegionCoordination.OpenRegionDetails ord;
57
58 private final boolean useZKForAssignment;
59
60 public OpenRegionHandler(final Server server,
61 final RegionServerServices rsServices, HRegionInfo regionInfo,
62 HTableDescriptor htd, long masterSystemTime, OpenRegionCoordination coordination,
63 OpenRegionCoordination.OpenRegionDetails ord) {
64 this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION,
65 masterSystemTime, coordination, ord);
66 }
67
68 protected OpenRegionHandler(final Server server,
69 final RegionServerServices rsServices, final HRegionInfo regionInfo,
70 final HTableDescriptor htd, EventType eventType, long masterSystemTime,
71 OpenRegionCoordination coordination, OpenRegionCoordination.OpenRegionDetails ord) {
72 super(server, eventType);
73 this.rsServices = rsServices;
74 this.regionInfo = regionInfo;
75 this.htd = htd;
76 this.coordination = coordination;
77 this.ord = ord;
78 useZKForAssignment = ConfigUtil.useZKForAssignment(server.getConfiguration());
79 this.masterSystemTime = masterSystemTime;
80 }
81
82 public HRegionInfo getRegionInfo() {
83 return regionInfo;
84 }
85
86 @Override
87 public void process() throws IOException {
88 boolean openSuccessful = false;
89 boolean transitionedToOpening = false;
90 final String regionName = regionInfo.getRegionNameAsString();
91 HRegion region = null;
92
93 try {
94 if (this.server.isStopped() || this.rsServices.isStopping()) {
95 return;
96 }
97 final String encodedName = regionInfo.getEncodedName();
98
99
100
101
102
103
104
105 if (this.rsServices.getFromOnlineRegions(encodedName) != null) {
106 LOG.error("Region " + encodedName +
107 " was already online when we started processing the opening. " +
108 "Marking this new attempt as failed");
109 return;
110 }
111
112
113
114
115 if (!isRegionStillOpening()){
116 LOG.error("Region " + encodedName + " opening cancelled");
117 return;
118 }
119
120 if (useZKForAssignment
121 && !coordination.transitionFromOfflineToOpening(regionInfo, ord)) {
122 LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName);
123
124 return;
125 }
126 transitionedToOpening = true;
127
128
129 region = openRegion();
130 if (region == null) {
131 return;
132 }
133
134 boolean failed = true;
135 if (isRegionStillOpening() && (!useZKForAssignment ||
136 coordination.tickleOpening(ord, regionInfo, rsServices, "post_region_open"))) {
137 if (updateMeta(region, masterSystemTime)) {
138 failed = false;
139 }
140 }
141 if (failed || this.server.isStopped() ||
142 this.rsServices.isStopping()) {
143 return;
144 }
145
146 if (!isRegionStillOpening() ||
147 (useZKForAssignment && !coordination.transitionToOpened(region, ord))) {
148
149
150
151
152
153 return;
154 }
155
156
157
158
159
160
161
162
163
164
165
166 this.rsServices.addToOnlineRegions(region);
167 openSuccessful = true;
168
169
170 LOG.debug("Opened " + regionName + " on " +
171 this.server.getServerName());
172
173
174 } finally {
175
176 if (!openSuccessful) {
177 doCleanUpOnFailedOpen(region, transitionedToOpening, ord);
178 }
179 final Boolean current = this.rsServices.getRegionsInTransitionInRS().
180 remove(this.regionInfo.getEncodedNameAsBytes());
181
182
183
184
185
186
187
188
189 if (openSuccessful) {
190 if (current == null) {
191 LOG.error("Bad state: we've just opened a region that was NOT in transition. Region="
192 + regionName);
193 } else if (Boolean.FALSE.equals(current)) {
194
195 LOG.error("Race condition: we've finished to open a region, while a close was requested "
196 + " on region=" + regionName + ". It can be a critical error, as a region that"
197 + " should be closed is now opened. Closing it now");
198 cleanupFailedOpen(region);
199 }
200 }
201 }
202 }
203
204 private void doCleanUpOnFailedOpen(HRegion region, boolean transitionedToOpening,
205 OpenRegionCoordination.OpenRegionDetails ord)
206 throws IOException {
207 if (transitionedToOpening) {
208 try {
209 if (region != null) {
210 cleanupFailedOpen(region);
211 }
212 } finally {
213 if (!useZKForAssignment) {
214 rsServices.reportRegionStateTransition(TransitionCode.FAILED_OPEN, regionInfo);
215 } else {
216
217
218 coordination.tryTransitionFromOpeningToFailedOpen(regionInfo, ord);
219 }
220 }
221 } else if (!useZKForAssignment) {
222 rsServices.reportRegionStateTransition(TransitionCode.FAILED_OPEN, regionInfo);
223 } else {
224
225
226 coordination.tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, ord);
227 }
228 }
229
230
231
232
233
234
235
236
237 boolean updateMeta(final HRegion r, long masterSystemTime) {
238 if (this.server.isStopped() || this.rsServices.isStopping()) {
239 return false;
240 }
241
242
243 final AtomicBoolean signaller = new AtomicBoolean(false);
244 PostOpenDeployTasksThread t = new PostOpenDeployTasksThread(r,
245 this.server, this.rsServices, signaller, masterSystemTime);
246 t.start();
247
248
249
250
251
252 long now = System.currentTimeMillis();
253 long lastUpdate = now;
254 boolean tickleOpening = true;
255 while (!signaller.get() && t.isAlive() && !this.server.isStopped() &&
256 !this.rsServices.isStopping() && isRegionStillOpening()) {
257 long elapsed = now - lastUpdate;
258 if (elapsed > 120000) {
259
260 lastUpdate = now;
261 if (useZKForAssignment) {
262 tickleOpening = coordination.tickleOpening(
263 ord, regionInfo, rsServices, "post_open_deploy");
264 }
265 }
266 synchronized (signaller) {
267 try {
268
269
270 if (!signaller.get()) signaller.wait(10000);
271 } catch (InterruptedException e) {
272
273 }
274 }
275 now = System.currentTimeMillis();
276 }
277
278
279 if (t.isAlive()) {
280 if (!signaller.get()) {
281
282 LOG.debug("Interrupting thread " + t);
283 t.interrupt();
284 }
285 try {
286 t.join();
287 } catch (InterruptedException ie) {
288 LOG.warn("Interrupted joining " +
289 r.getRegionInfo().getRegionNameAsString(), ie);
290 Thread.currentThread().interrupt();
291 }
292 }
293
294
295
296
297 return ((!Thread.interrupted() && t.getException() == null) && tickleOpening);
298 }
299
300
301
302
303
304
305 static class PostOpenDeployTasksThread extends Thread {
306 private Throwable exception = null;
307 private final Server server;
308 private final RegionServerServices services;
309 private final HRegion region;
310 private final AtomicBoolean signaller;
311 private final long masterSystemTime;
312
313 PostOpenDeployTasksThread(final HRegion region, final Server server,
314 final RegionServerServices services, final AtomicBoolean signaller, long masterSystemTime) {
315 super("PostOpenDeployTasks:" + region.getRegionInfo().getEncodedName());
316 this.setDaemon(true);
317 this.server = server;
318 this.services = services;
319 this.region = region;
320 this.signaller = signaller;
321 this.masterSystemTime = masterSystemTime;
322 }
323
324 @Override
325 public void run() {
326 try {
327 this.services.postOpenDeployTasks(new PostOpenDeployContext(region, masterSystemTime));
328 } catch (Throwable e) {
329 String msg = "Exception running postOpenDeployTasks; region=" +
330 this.region.getRegionInfo().getEncodedName();
331 this.exception = e;
332 if (e instanceof IOException
333 && isRegionStillOpening(region.getRegionInfo(), services)) {
334 server.abort(msg, e);
335 } else {
336 LOG.warn(msg, e);
337 }
338 }
339
340 this.signaller.set(true);
341 synchronized (this.signaller) {
342 this.signaller.notify();
343 }
344 }
345
346
347
348
349 Throwable getException() {
350 return this.exception;
351 }
352 }
353
354
355
356
357 HRegion openRegion() {
358 HRegion region = null;
359 try {
360
361
362 region = HRegion.openHRegion(this.regionInfo, this.htd,
363 this.rsServices.getWAL(this.regionInfo),
364 this.server.getConfiguration(),
365 this.rsServices,
366 new CancelableProgressable() {
367 @Override
368 public boolean progress() {
369 if (useZKForAssignment) {
370
371 return coordination.tickleOpening(ord, regionInfo,
372 rsServices, "open_region_progress");
373 }
374 if (!isRegionStillOpening()) {
375 LOG.warn("Open region aborted since it isn't opening any more");
376 return false;
377 }
378 return true;
379 }
380 });
381 } catch (Throwable t) {
382
383
384
385 LOG.error(
386 "Failed open of region=" + this.regionInfo.getRegionNameAsString()
387 + ", starting to roll back the global memstore size.", t);
388
389 if (this.rsServices != null) {
390 RegionServerAccounting rsAccounting =
391 this.rsServices.getRegionServerAccounting();
392 if (rsAccounting != null) {
393 rsAccounting.rollbackRegionReplayEditsSize(this.regionInfo.getRegionName());
394 }
395 }
396 }
397 return region;
398 }
399
400 void cleanupFailedOpen(final HRegion region) throws IOException {
401 if (region != null) {
402 byte[] encodedName = regionInfo.getEncodedNameAsBytes();
403 try {
404 rsServices.getRegionsInTransitionInRS().put(encodedName,Boolean.FALSE);
405 this.rsServices.removeFromOnlineRegions(region, null);
406 region.close();
407 } finally {
408 rsServices.getRegionsInTransitionInRS().remove(encodedName);
409 }
410 }
411 }
412
413 private static boolean isRegionStillOpening(
414 HRegionInfo regionInfo, RegionServerServices rsServices) {
415 byte[] encodedName = regionInfo.getEncodedNameAsBytes();
416 Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
417 return Boolean.TRUE.equals(action);
418 }
419
420 private boolean isRegionStillOpening() {
421 return isRegionStillOpening(regionInfo, rsServices);
422 }
423 }