1
2
3
4
5
6
7
8
9
10
11
12 package org.apache.hadoop.hbase.coordination;
13
14 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
15 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
16 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
17
18 import java.io.IOException;
19 import java.util.List;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.CoordinatedStateManager;
24 import org.apache.hadoop.hbase.HRegionInfo;
25 import org.apache.hadoop.hbase.RegionTransition;
26 import org.apache.hadoop.hbase.ServerName;
27 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
28 import org.apache.hadoop.hbase.executor.EventType;
29 import org.apache.hadoop.hbase.regionserver.HRegion;
30 import org.apache.hadoop.hbase.regionserver.Region;
31 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
32 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
33 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
34 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
35 import org.apache.zookeeper.KeeperException;
36 import org.apache.zookeeper.data.Stat;
37
38 public class ZKSplitTransactionCoordination implements SplitTransactionCoordination {
39
40 private CoordinatedStateManager coordinationManager;
41 private final ZooKeeperWatcher watcher;
42
43 private static final Log LOG = LogFactory.getLog(ZKSplitTransactionCoordination.class);
44
45 public ZKSplitTransactionCoordination(CoordinatedStateManager coordinationProvider,
46 ZooKeeperWatcher watcher) {
47 this.coordinationManager = coordinationProvider;
48 this.watcher = watcher;
49 }
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 @Override
65 public void startSplitTransaction(HRegion parent, ServerName serverName, HRegionInfo hri_a,
66 HRegionInfo hri_b) throws IOException {
67
68 HRegionInfo region = parent.getRegionInfo();
69 try {
70
71 LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
72 + " in PENDING_SPLIT state"));
73 byte[] payload = HRegionInfo.toDelimitedByteArray(hri_a, hri_b);
74 RegionTransition rt =
75 RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_SPLIT,
76 region.getRegionName(), serverName, payload);
77 String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
78 if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
79 throw new IOException("Failed create of ephemeral " + node);
80 }
81
82 } catch (KeeperException e) {
83 throw new IOException("Failed creating PENDING_SPLIT znode on "
84 + parent.getRegionInfo().getRegionNameAsString(), e);
85 }
86
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 private int transitionSplittingNode(HRegionInfo parent, HRegionInfo a, HRegionInfo b,
123 ServerName serverName, SplitTransactionDetails std, final EventType beginState,
124 final EventType endState) throws IOException {
125 ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
126 byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
127 try {
128 return ZKAssign.transitionNode(watcher, parent, serverName, beginState, endState,
129 zstd.getZnodeVersion(), payload);
130 } catch (KeeperException e) {
131 throw new IOException(
132 "Failed transition of splitting node " + parent.getRegionNameAsString(), e);
133 }
134 }
135
136
137
138
139
140
141
142 @Override
143 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
144 justification="Intended")
145 public void waitForSplitTransaction(final RegionServerServices services, Region parent,
146 HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails sptd) throws IOException {
147 ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) sptd;
148
149
150
151
152 try {
153 int spins = 0;
154 Stat stat = new Stat();
155 ServerName expectedServer = coordinationManager.getServer().getServerName();
156 String node = parent.getRegionInfo().getEncodedName();
157 while (!(coordinationManager.getServer().isStopped() || services.isStopping())) {
158 if (spins % 5 == 0) {
159 LOG.debug("Still waiting for master to process " + "the pending_split for " + node);
160 SplitTransactionDetails temp = getDefaultDetails();
161 transitionSplittingNode(parent.getRegionInfo(), hri_a, hri_b, expectedServer, temp,
162 RS_ZK_REQUEST_REGION_SPLIT, RS_ZK_REQUEST_REGION_SPLIT);
163 }
164 Thread.sleep(100);
165 spins++;
166 byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
167 if (data == null) {
168 throw new IOException("Data is null, splitting node " + node + " no longer exists");
169 }
170 RegionTransition rt = RegionTransition.parseFrom(data);
171 EventType et = rt.getEventType();
172 if (et == RS_ZK_REGION_SPLITTING) {
173 ServerName serverName = rt.getServerName();
174 if (!serverName.equals(expectedServer)) {
175 throw new IOException("Splitting node " + node + " is for " + serverName + ", not us "
176 + expectedServer);
177 }
178 byte[] payloadOfSplitting = rt.getPayload();
179 List<HRegionInfo> splittingRegions =
180 HRegionInfo.parseDelimitedFrom(payloadOfSplitting, 0, payloadOfSplitting.length);
181 assert splittingRegions.size() == 2;
182 HRegionInfo a = splittingRegions.get(0);
183 HRegionInfo b = splittingRegions.get(1);
184 if (!(hri_a.equals(a) && hri_b.equals(b))) {
185 throw new IOException("Splitting node " + node + " is for " + a + ", " + b
186 + ", not expected daughters: " + hri_a + ", " + hri_b);
187 }
188
189 zstd.setZnodeVersion(stat.getVersion());
190 return;
191 }
192 if (et != RS_ZK_REQUEST_REGION_SPLIT) {
193 throw new IOException("Splitting node " + node + " moved out of splitting to " + et);
194 }
195 }
196
197 throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
198 } catch (Exception e) {
199 if (e instanceof InterruptedException) {
200 Thread.currentThread().interrupt();
201 }
202 throw new IOException("Failed getting SPLITTING znode on " +
203 parent.getRegionInfo().getRegionNameAsString(), e);
204 }
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218 @Override
219 public void completeSplitTransaction(final RegionServerServices services, Region a, Region b,
220 SplitTransactionDetails std, Region parent) throws IOException {
221 ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
222
223 if (coordinationManager.getServer() != null) {
224 try {
225 zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
226 b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
227 RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT));
228
229 int spins = 0;
230
231
232
233 do {
234 if (spins % 10 == 0) {
235 LOG.debug("Still waiting on the master to process the split for "
236 + parent.getRegionInfo().getEncodedName());
237 }
238 Thread.sleep(100);
239
240 zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
241 b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
242 RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT));
243 spins++;
244 } while (zstd.getZnodeVersion() != -1 && !coordinationManager.getServer().isStopped()
245 && !services.isStopping());
246 } catch (Exception e) {
247 if (e instanceof InterruptedException) {
248 Thread.currentThread().interrupt();
249 }
250 throw new IOException("Failed telling master about split", e);
251 }
252 }
253
254
255
256
257 }
258
259 @Override
260 public void clean(final HRegionInfo hri) {
261 try {
262
263 if (!ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(),
264 hri.getEncodedName(), RS_ZK_REQUEST_REGION_SPLIT, coordinationManager.getServer()
265 .getServerName())) {
266 ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(), hri.getEncodedName(),
267 RS_ZK_REGION_SPLITTING, coordinationManager.getServer().getServerName());
268 }
269 } catch (KeeperException.NoNodeException e) {
270 LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
271 } catch (KeeperException e) {
272 coordinationManager.getServer().abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
273 }
274 }
275
276
277
278
279
280 public static class ZkSplitTransactionDetails implements
281 SplitTransactionCoordination.SplitTransactionDetails {
282 private int znodeVersion;
283
284 public ZkSplitTransactionDetails() {
285 }
286
287
288
289
290 public int getZnodeVersion() {
291 return znodeVersion;
292 }
293
294
295
296
297 public void setZnodeVersion(int znodeVersion) {
298 this.znodeVersion = znodeVersion;
299 }
300 }
301
302 @Override
303 public SplitTransactionDetails getDefaultDetails() {
304 ZkSplitTransactionDetails zstd = new ZkSplitTransactionDetails();
305 zstd.setZnodeVersion(-1);
306 return zstd;
307 }
308
309 @Override
310 public int processTransition(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b, ServerName sn,
311 SplitTransactionDetails std) throws IOException {
312 return transitionSplittingNode(p, hri_a, hri_b, sn, std, RS_ZK_REQUEST_REGION_SPLIT,
313 RS_ZK_REGION_SPLITTING);
314
315 }
316 }