1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.handler;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.HashSet;
24 import java.util.Set;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.HRegionInfo;
30 import org.apache.hadoop.hbase.Server;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.executor.EventType;
33 import org.apache.hadoop.hbase.master.AssignmentManager;
34 import org.apache.hadoop.hbase.master.DeadServer;
35 import org.apache.hadoop.hbase.master.MasterServices;
36 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
37 import org.apache.hadoop.hbase.util.Threads;
38 import org.apache.zookeeper.KeeperException;
39
40 import com.google.common.annotations.VisibleForTesting;
41
42 import java.util.concurrent.atomic.AtomicInteger;
43
44
45
46
47 @InterfaceAudience.Private
48 public class MetaServerShutdownHandler extends ServerShutdownHandler {
49 private static final Log LOG = LogFactory.getLog(MetaServerShutdownHandler.class);
50 private AtomicInteger eventExceptionCount = new AtomicInteger(0);
51 @VisibleForTesting
52 static final int SHOW_STRACKTRACE_FREQUENCY = 100;
53
54 public MetaServerShutdownHandler(final Server server,
55 final MasterServices services,
56 final DeadServer deadServers, final ServerName serverName) {
57 super(server, services, deadServers, serverName,
58 EventType.M_META_SERVER_SHUTDOWN, true);
59 }
60
61 @Override
62 public void process() throws IOException {
63 boolean gotException = true;
64 try {
65 AssignmentManager am = this.services.getAssignmentManager();
66 this.services.getMasterFileSystem().setLogRecoveryMode();
67 boolean distributedLogReplay =
68 (this.services.getMasterFileSystem().getLogRecoveryMode() == RecoveryMode.LOG_REPLAY);
69 try {
70 if (this.shouldSplitWal) {
71 LOG.info("Splitting hbase:meta logs for " + serverName);
72 if (distributedLogReplay) {
73 Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
74 regions.add(HRegionInfo.FIRST_META_REGIONINFO);
75 this.services.getMasterFileSystem().prepareLogReplay(serverName, regions);
76 } else {
77 this.services.getMasterFileSystem().splitMetaLog(serverName);
78 }
79 am.getRegionStates().logSplit(HRegionInfo.FIRST_META_REGIONINFO);
80 }
81 } catch (IOException ioe) {
82 this.services.getExecutorService().submit(this);
83 this.deadServers.add(serverName);
84 throw new IOException("failed log splitting for " + serverName + ", will retry", ioe);
85 }
86
87
88
89
90 AssignmentManager.ServerHostRegion rsCarryingMetaRegion = am.isCarryingMeta(serverName);
91 switch (rsCarryingMetaRegion) {
92 case HOSTING_REGION:
93 LOG.info("Server " + serverName + " was carrying META. Trying to assign.");
94 am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
95 verifyAndAssignMetaWithRetries();
96 break;
97 case UNKNOWN:
98 if (!server.getMetaTableLocator().isLocationAvailable(this.server.getZooKeeper())) {
99
100
101
102 verifyAndAssignMetaWithRetries();
103 break;
104 }
105
106 case NOT_HOSTING_REGION:
107 LOG.info("META has been assigned to otherwhere, skip assigning.");
108 break;
109 default:
110 throw new IOException("Unsupported action in MetaServerShutdownHandler");
111 }
112
113 try {
114 if (this.shouldSplitWal && distributedLogReplay) {
115 if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,
116 regionAssignmentWaitTimeout)) {
117
118
119 LOG.warn("Region " + HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()
120 + " didn't complete assignment in time");
121 }
122 this.services.getMasterFileSystem().splitMetaLog(serverName);
123 }
124 } catch (Exception ex) {
125 if (ex instanceof IOException) {
126 this.services.getExecutorService().submit(this);
127 this.deadServers.add(serverName);
128 throw new IOException("failed log splitting for " + serverName + ", will retry", ex);
129 } else {
130 throw new IOException(ex);
131 }
132 }
133
134 gotException = false;
135 } finally {
136 if (gotException){
137
138 this.deadServers.finish(serverName);
139 }
140 }
141
142 super.process();
143
144 this.eventExceptionCount.set(0);
145 }
146
147 @Override
148 boolean isCarryingMeta() {
149 return true;
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163 private void verifyAndAssignMeta()
164 throws InterruptedException, IOException, KeeperException {
165 long timeout = this.server.getConfiguration().
166 getLong("hbase.catalog.verification.timeout", 1000);
167 if (!server.getMetaTableLocator().verifyMetaRegionLocation(server.getConnection(),
168 this.server.getZooKeeper(), timeout)) {
169 this.services.getAssignmentManager().assignMeta(HRegionInfo.FIRST_META_REGIONINFO);
170 } else if (serverName.equals(server.getMetaTableLocator().getMetaRegionLocation(
171 this.server.getZooKeeper()))) {
172
173
174 this.services.getAssignmentManager().assignMeta(HRegionInfo.FIRST_META_REGIONINFO);
175 } else {
176 LOG.info("Skip assigning hbase:meta, because it is online on the "
177 + server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper()));
178 }
179 }
180
181
182
183
184
185 private void verifyAndAssignMetaWithRetries() throws IOException {
186 int iTimes = this.server.getConfiguration().getInt(
187 "hbase.catalog.verification.retries", 10);
188
189 long waitTime = this.server.getConfiguration().getLong(
190 "hbase.catalog.verification.timeout", 1000);
191
192 int iFlag = 0;
193 while (true) {
194 try {
195 verifyAndAssignMeta();
196 break;
197 } catch (KeeperException e) {
198 this.server.abort("In server shutdown processing, assigning meta", e);
199 throw new IOException("Aborting", e);
200 } catch (Exception e) {
201 if (iFlag >= iTimes) {
202 this.server.abort("verifyAndAssignMeta failed after" + iTimes
203 + " times retries, aborting", e);
204 throw new IOException("Aborting", e);
205 }
206 try {
207 Thread.sleep(waitTime);
208 } catch (InterruptedException e1) {
209 LOG.warn("Interrupted when is the thread sleep", e1);
210 Thread.currentThread().interrupt();
211 throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
212 }
213 iFlag++;
214 }
215 }
216 }
217
218 @Override
219 protected void handleException(Throwable t) {
220 int count = eventExceptionCount.getAndIncrement();
221 if (count < 0) count = eventExceptionCount.getAndSet(0);
222 if (count > SHOW_STRACKTRACE_FREQUENCY) {
223 Threads.sleep(1000);
224 }
225 if (count % SHOW_STRACKTRACE_FREQUENCY == 0) {
226 LOG.error("Caught " + eventType + ", count=" + this.eventExceptionCount, t);
227 } else {
228 LOG.error("Caught " + eventType + ", count=" + this.eventExceptionCount +
229 "; " + t.getMessage() + "; stack trace shows every " + SHOW_STRACKTRACE_FREQUENCY +
230 "th time.");
231 }
232 }
233 }