1
2
3
4
5
6
7
8
9
10
11
12 package org.apache.hadoop.hbase.coordination;
13
14 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
15 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
16 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
17
18 import java.io.IOException;
19 import java.util.List;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.CoordinatedStateManager;
24 import org.apache.hadoop.hbase.HRegionInfo;
25 import org.apache.hadoop.hbase.RegionTransition;
26 import org.apache.hadoop.hbase.ServerName;
27 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
28 import org.apache.hadoop.hbase.executor.EventType;
29 import org.apache.hadoop.hbase.regionserver.HRegion;
30 import org.apache.hadoop.hbase.regionserver.Region;
31 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
32 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
33 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
34 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
35 import org.apache.zookeeper.KeeperException;
36 import org.apache.zookeeper.data.Stat;
37
38 public class ZKSplitTransactionCoordination implements SplitTransactionCoordination {
39
40 private CoordinatedStateManager coordinationManager;
41 private final ZooKeeperWatcher watcher;
42
43 private static final Log LOG = LogFactory.getLog(ZKSplitTransactionCoordination.class);
44
45 public ZKSplitTransactionCoordination(CoordinatedStateManager coordinationProvider,
46 ZooKeeperWatcher watcher) {
47 this.coordinationManager = coordinationProvider;
48 this.watcher = watcher;
49 }
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 @Override
65 public void startSplitTransaction(HRegion parent, ServerName serverName, HRegionInfo hri_a,
66 HRegionInfo hri_b) throws IOException {
67
68 HRegionInfo region = parent.getRegionInfo();
69 try {
70
71 LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
72 + " in PENDING_SPLIT state"));
73 byte[] payload = HRegionInfo.toDelimitedByteArray(hri_a, hri_b);
74 RegionTransition rt =
75 RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_SPLIT,
76 region.getRegionName(), serverName, payload);
77 String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
78 if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
79 throw new IOException("Failed create of ephemeral " + node);
80 }
81
82 } catch (KeeperException e) {
83 throw new IOException("Failed creating PENDING_SPLIT znode on "
84 + parent.getRegionInfo().getRegionNameAsString(), e);
85 }
86
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 private int transitionSplittingNode(HRegionInfo parent, HRegionInfo a, HRegionInfo b,
123 ServerName serverName, SplitTransactionDetails std, final EventType beginState,
124 final EventType endState) throws IOException {
125 ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
126 byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
127 try {
128 return ZKAssign.transitionNode(watcher, parent, serverName, beginState, endState,
129 zstd.getZnodeVersion(), payload);
130 } catch (KeeperException e) {
131 throw new IOException(
132 "Failed transition of splitting node " + parent.getRegionNameAsString(), e);
133 }
134 }
135
136
137
138
139
140
141
142 @Override
143 public void waitForSplitTransaction(final RegionServerServices services, Region parent,
144 HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails sptd) throws IOException {
145 ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) sptd;
146
147
148
149
150 try {
151 int spins = 0;
152 Stat stat = new Stat();
153 ServerName expectedServer = coordinationManager.getServer().getServerName();
154 String node = parent.getRegionInfo().getEncodedName();
155 while (!(coordinationManager.getServer().isStopped() || services.isStopping())) {
156 if (spins % 5 == 0) {
157 LOG.debug("Still waiting for master to process " + "the pending_split for " + node);
158 SplitTransactionDetails temp = getDefaultDetails();
159 transitionSplittingNode(parent.getRegionInfo(), hri_a, hri_b, expectedServer, temp,
160 RS_ZK_REQUEST_REGION_SPLIT, RS_ZK_REQUEST_REGION_SPLIT);
161 }
162 Thread.sleep(100);
163 spins++;
164 byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
165 if (data == null) {
166 throw new IOException("Data is null, splitting node " + node + " no longer exists");
167 }
168 RegionTransition rt = RegionTransition.parseFrom(data);
169 EventType et = rt.getEventType();
170 if (et == RS_ZK_REGION_SPLITTING) {
171 ServerName serverName = rt.getServerName();
172 if (!serverName.equals(expectedServer)) {
173 throw new IOException("Splitting node " + node + " is for " + serverName + ", not us "
174 + expectedServer);
175 }
176 byte[] payloadOfSplitting = rt.getPayload();
177 List<HRegionInfo> splittingRegions =
178 HRegionInfo.parseDelimitedFrom(payloadOfSplitting, 0, payloadOfSplitting.length);
179 assert splittingRegions.size() == 2;
180 HRegionInfo a = splittingRegions.get(0);
181 HRegionInfo b = splittingRegions.get(1);
182 if (!(hri_a.equals(a) && hri_b.equals(b))) {
183 throw new IOException("Splitting node " + node + " is for " + a + ", " + b
184 + ", not expected daughters: " + hri_a + ", " + hri_b);
185 }
186
187 zstd.setZnodeVersion(stat.getVersion());
188 return;
189 }
190 if (et != RS_ZK_REQUEST_REGION_SPLIT) {
191 throw new IOException("Splitting node " + node + " moved out of splitting to " + et);
192 }
193 }
194
195 throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
196 } catch (Exception e) {
197 if (e instanceof InterruptedException) {
198 Thread.currentThread().interrupt();
199 }
200 throw new IOException("Failed getting SPLITTING znode on " +
201 parent.getRegionInfo().getRegionNameAsString(), e);
202 }
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216 @Override
217 public void completeSplitTransaction(final RegionServerServices services, Region a, Region b,
218 SplitTransactionDetails std, Region parent) throws IOException {
219 ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
220
221 if (coordinationManager.getServer() != null) {
222 try {
223 zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
224 b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
225 RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT));
226
227 int spins = 0;
228
229
230
231 do {
232 if (spins % 10 == 0) {
233 LOG.debug("Still waiting on the master to process the split for "
234 + parent.getRegionInfo().getEncodedName());
235 }
236 Thread.sleep(100);
237
238 zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
239 b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
240 RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT));
241 spins++;
242 } while (zstd.getZnodeVersion() != -1 && !coordinationManager.getServer().isStopped()
243 && !services.isStopping());
244 } catch (Exception e) {
245 if (e instanceof InterruptedException) {
246 Thread.currentThread().interrupt();
247 }
248 throw new IOException("Failed telling master about split", e);
249 }
250 }
251
252
253
254
255 }
256
257 @Override
258 public void clean(final HRegionInfo hri) {
259 try {
260
261 if (!ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(),
262 hri.getEncodedName(), RS_ZK_REQUEST_REGION_SPLIT, coordinationManager.getServer()
263 .getServerName())) {
264 ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(), hri.getEncodedName(),
265 RS_ZK_REGION_SPLITTING, coordinationManager.getServer().getServerName());
266 }
267 } catch (KeeperException.NoNodeException e) {
268 LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
269 } catch (KeeperException e) {
270 coordinationManager.getServer().abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
271 }
272 }
273
274
275
276
277
278 public static class ZkSplitTransactionDetails implements
279 SplitTransactionCoordination.SplitTransactionDetails {
280 private int znodeVersion;
281
282 public ZkSplitTransactionDetails() {
283 }
284
285
286
287
288 public int getZnodeVersion() {
289 return znodeVersion;
290 }
291
292
293
294
295 public void setZnodeVersion(int znodeVersion) {
296 this.znodeVersion = znodeVersion;
297 }
298 }
299
300 @Override
301 public SplitTransactionDetails getDefaultDetails() {
302 ZkSplitTransactionDetails zstd = new ZkSplitTransactionDetails();
303 zstd.setZnodeVersion(-1);
304 return zstd;
305 }
306
307 @Override
308 public int processTransition(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b, ServerName sn,
309 SplitTransactionDetails std) throws IOException {
310 return transitionSplittingNode(p, hri_a, hri_b, sn, std, RS_ZK_REQUEST_REGION_SPLIT,
311 RS_ZK_REGION_SPLITTING);
312
313 }
314 }