1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.handler;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableMap;
27 import java.util.Set;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.HRegionInfo;
34 import org.apache.hadoop.hbase.Server;
35 import org.apache.hadoop.hbase.ServerName;
36 import org.apache.hadoop.hbase.catalog.CatalogTracker;
37 import org.apache.hadoop.hbase.catalog.MetaReader;
38 import org.apache.hadoop.hbase.client.Result;
39 import org.apache.hadoop.hbase.executor.EventHandler;
40 import org.apache.hadoop.hbase.executor.EventType;
41 import org.apache.hadoop.hbase.master.AssignmentManager;
42 import org.apache.hadoop.hbase.master.DeadServer;
43 import org.apache.hadoop.hbase.master.MasterServices;
44 import org.apache.hadoop.hbase.master.RegionState;
45 import org.apache.hadoop.hbase.master.RegionStates;
46 import org.apache.hadoop.hbase.master.ServerManager;
47 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
48 import org.apache.zookeeper.KeeperException;
49
50
51
52
53
54
55 @InterfaceAudience.Private
56 public class ServerShutdownHandler extends EventHandler {
57 private static final Log LOG = LogFactory.getLog(ServerShutdownHandler.class);
58 protected final ServerName serverName;
59 protected final MasterServices services;
60 protected final DeadServer deadServers;
61 protected final boolean shouldSplitHlog;
62 protected final boolean distributedLogReplay;
63 protected final int regionAssignmentWaitTimeout;
64
65 public ServerShutdownHandler(final Server server, final MasterServices services,
66 final DeadServer deadServers, final ServerName serverName,
67 final boolean shouldSplitHlog) {
68 this(server, services, deadServers, serverName, EventType.M_SERVER_SHUTDOWN,
69 shouldSplitHlog);
70 }
71
72 ServerShutdownHandler(final Server server, final MasterServices services,
73 final DeadServer deadServers, final ServerName serverName, EventType type,
74 final boolean shouldSplitHlog) {
75 super(server, type);
76 this.serverName = serverName;
77 this.server = server;
78 this.services = services;
79 this.deadServers = deadServers;
80 if (!this.deadServers.isDeadServer(this.serverName)) {
81 LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
82 }
83 this.shouldSplitHlog = shouldSplitHlog;
84 this.distributedLogReplay = server.getConfiguration().getBoolean(
85 HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
86 HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
87 this.regionAssignmentWaitTimeout = server.getConfiguration().getInt(
88 HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000);
89 }
90
91 @Override
92 public String getInformativeName() {
93 if (serverName != null) {
94 return this.getClass().getSimpleName() + " for " + serverName;
95 } else {
96 return super.getInformativeName();
97 }
98 }
99
100
101
102
103 boolean isCarryingMeta() {
104 return false;
105 }
106
107 @Override
108 public String toString() {
109 String name = "UnknownServerName";
110 if(server != null && server.getServerName() != null) {
111 name = server.getServerName().toString();
112 }
113 return getClass().getSimpleName() + "-" + name + "-" + getSeqid();
114 }
115
116 @Override
117 public void process() throws IOException {
118 final ServerName serverName = this.serverName;
119 try {
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 if (isCarryingMeta()
143 || !services.getAssignmentManager().isFailoverCleanupDone()) {
144 this.services.getServerManager().processDeadServer(serverName, this.shouldSplitHlog);
145 return;
146 }
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 NavigableMap<HRegionInfo, Result> hris = null;
164 while (!this.server.isStopped()) {
165 try {
166 this.server.getCatalogTracker().waitForMeta();
167 hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(),
168 this.serverName);
169 break;
170 } catch (InterruptedException e) {
171 Thread.currentThread().interrupt();
172 throw new IOException("Interrupted", e);
173 } catch (IOException ioe) {
174 LOG.info("Received exception accessing META during server shutdown of " +
175 serverName + ", retrying META read", ioe);
176 }
177 }
178 if (this.server.isStopped()) {
179 throw new IOException("Server is stopped");
180 }
181
182 try {
183 if (this.shouldSplitHlog) {
184 LOG.info("Splitting logs for " + serverName + " before assignment.");
185 if(this.distributedLogReplay){
186 Set<ServerName> serverNames = new HashSet<ServerName>();
187 serverNames.add(serverName);
188 this.services.getMasterFileSystem().prepareLogReplay(serverNames);
189 } else {
190 this.services.getMasterFileSystem().splitLog(serverName);
191 }
192 } else {
193 LOG.info("Skipping log splitting for " + serverName);
194 }
195 } catch (IOException ioe) {
196 resubmit(serverName, ioe);
197 }
198
199
200
201
202
203 AssignmentManager am = services.getAssignmentManager();
204 List<HRegionInfo> regionsInTransition = am.processServerShutdown(serverName);
205 LOG.info("Reassigning " + ((hris == null)? 0: hris.size()) +
206 " region(s) that " + (serverName == null? "null": serverName) +
207 " was carrying (and " + regionsInTransition.size() +
208 " regions(s) that were opening on this server)");
209
210 List<HRegionInfo> toAssignRegions = new ArrayList<HRegionInfo>();
211 toAssignRegions.addAll(regionsInTransition);
212
213
214 if (hris != null) {
215 RegionStates regionStates = am.getRegionStates();
216 for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
217 HRegionInfo hri = e.getKey();
218 if (regionsInTransition.contains(hri)) {
219 continue;
220 }
221 RegionState rit = regionStates.getRegionTransitionState(hri);
222 if (processDeadRegion(hri, e.getValue(), am, server.getCatalogTracker())) {
223 ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
224 if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
225
226
227 LOG.info("Skip assigning region " + hri.getRegionNameAsString()
228 + " because it has been opened in " + addressFromAM.getServerName());
229 continue;
230 }
231 if (rit != null) {
232 if (!rit.isOnServer(serverName)
233 || rit.isClosed() || rit.isOpened() || rit.isSplit()) {
234
235
236 LOG.info("Skip assigning region " + rit);
237 continue;
238 }
239 try{
240
241 LOG.info("Reassigning region with rs = " + rit + " and deleting zk node if exists");
242 ZKAssign.deleteNodeFailSilent(services.getZooKeeper(), hri);
243 } catch (KeeperException ke) {
244 this.server.abort("Unexpected ZK exception deleting unassigned node " + hri, ke);
245 return;
246 }
247 }
248 toAssignRegions.add(hri);
249 } else if (rit != null) {
250 if (rit.isSplitting() || rit.isSplit()) {
251
252
253
254
255
256
257 am.regionOffline(hri);
258 } else if ((rit.isClosing() || rit.isPendingClose())
259 && am.getZKTable().isDisablingOrDisabledTable(hri.getTableNameAsString())) {
260
261
262
263
264
265 am.deleteClosingOrClosedNode(hri);
266 am.regionOffline(hri);
267 } else {
268 LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition "
269 + rit + " not to be assigned by SSH of server " + serverName);
270 }
271 }
272 }
273 }
274
275 try {
276 am.assign(toAssignRegions);
277 } catch (InterruptedException ie) {
278 LOG.error("Caught " + ie + " during round-robin assignment");
279 throw new IOException(ie);
280 }
281
282 try {
283 if (this.shouldSplitHlog && this.distributedLogReplay) {
284
285 for (HRegionInfo hri : toAssignRegions) {
286 if (!am.waitOnRegionToClearRegionsInTransition(hri, regionAssignmentWaitTimeout)) {
287 throw new IOException("Region " + hri.getEncodedName()
288 + " didn't complete assignment in time");
289 }
290 }
291 this.services.getMasterFileSystem().splitLog(serverName);
292 }
293 } catch (Exception ex) {
294 if (ex instanceof IOException) {
295 resubmit(serverName, (IOException)ex);
296 } else {
297 throw new IOException(ex);
298 }
299 }
300 } finally {
301 this.deadServers.finish(serverName);
302 }
303
304 LOG.info("Finished processing of shutdown of " + serverName);
305 }
306
307 private void resubmit(final ServerName serverName, IOException ex) throws IOException {
308
309
310 this.services.getExecutorService().submit((ServerShutdownHandler) this);
311 this.deadServers.add(serverName);
312 throw new IOException("failed log splitting for " + serverName + ", will retry", ex);
313 }
314
315
316
317
318
319
320
321
322
323
324
325 public static boolean processDeadRegion(HRegionInfo hri, Result result,
326 AssignmentManager assignmentManager, CatalogTracker catalogTracker)
327 throws IOException {
328 boolean tablePresent = assignmentManager.getZKTable().isTablePresent(
329 hri.getTableNameAsString());
330 if (!tablePresent) {
331 LOG.info("The table " + hri.getTableNameAsString()
332 + " was deleted. Hence not proceeding.");
333 return false;
334 }
335
336 boolean disabled = assignmentManager.getZKTable().isDisabledTable(
337 hri.getTableNameAsString());
338 if (disabled){
339 LOG.info("The table " + hri.getTableNameAsString()
340 + " was disabled. Hence not proceeding.");
341 return false;
342 }
343 if (hri.isOffline() && hri.isSplit()) {
344
345
346
347 return false;
348 }
349 boolean disabling = assignmentManager.getZKTable().isDisablingTable(
350 hri.getTableNameAsString());
351 if (disabling) {
352 LOG.info("The table " + hri.getTableNameAsString()
353 + " is disabled. Hence not assigning region" + hri.getEncodedName());
354 return false;
355 }
356 return true;
357 }
358 }