1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coordination;
21
22 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
23 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
24 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
25
26 import java.io.IOException;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.CoordinatedStateManager;
32 import org.apache.hadoop.hbase.HRegionInfo;
33 import org.apache.hadoop.hbase.RegionTransition;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.executor.EventType;
36 import org.apache.hadoop.hbase.regionserver.HRegion;
37 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
38 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
40 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
41 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42 import org.apache.zookeeper.KeeperException;
43 import org.apache.zookeeper.data.Stat;
44
45 public class ZkRegionMergeCoordination implements RegionMergeCoordination {
46
47 private CoordinatedStateManager manager;
48 private final ZooKeeperWatcher watcher;
49
50 private static final Log LOG = LogFactory.getLog(ZkRegionMergeCoordination.class);
51
52 public ZkRegionMergeCoordination(CoordinatedStateManager manager,
53 ZooKeeperWatcher watcher) {
54 this.manager = manager;
55 this.watcher = watcher;
56 }
57
58
59
60
61
62 public static class ZkRegionMergeDetails implements RegionMergeCoordination.RegionMergeDetails {
63 private int znodeVersion;
64
65 public ZkRegionMergeDetails() {
66 }
67
68 public int getZnodeVersion() {
69 return znodeVersion;
70 }
71
72 public void setZnodeVersion(int znodeVersion) {
73 this.znodeVersion = znodeVersion;
74 }
75 }
76
77 @Override
78 public RegionMergeDetails getDefaultDetails() {
79 ZkRegionMergeDetails zstd = new ZkRegionMergeDetails();
80 zstd.setZnodeVersion(-1);
81 return zstd;
82 }
83
84
85
86
87
88
89
90
91
92
93
94 @Override
95 public void waitForRegionMergeTransaction(RegionServerServices services,
96 HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails details)
97 throws IOException {
98 try {
99 int spins = 0;
100 Stat stat = new Stat();
101 ServerName expectedServer = manager.getServer().getServerName();
102 String node = mergedRegionInfo.getEncodedName();
103 ZkRegionMergeDetails zdetails = (ZkRegionMergeDetails) details;
104 while (!(manager.getServer().isStopped() || services.isStopping())) {
105 if (spins % 5 == 0) {
106 LOG.debug("Still waiting for master to process " + "the pending_merge for " + node);
107 ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) getDefaultDetails();
108 transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(),
109 region_b.getRegionInfo(), expectedServer, zrmd, RS_ZK_REQUEST_REGION_MERGE,
110 RS_ZK_REQUEST_REGION_MERGE);
111 }
112 Thread.sleep(100);
113 spins++;
114 byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
115 if (data == null) {
116 throw new IOException("Data is null, merging node " + node + " no longer exists");
117 }
118 RegionTransition rt = RegionTransition.parseFrom(data);
119 EventType et = rt.getEventType();
120 if (et == RS_ZK_REGION_MERGING) {
121 ServerName serverName = rt.getServerName();
122 if (!serverName.equals(expectedServer)) {
123 throw new IOException("Merging node " + node + " is for " + serverName + ", not us "
124 + expectedServer);
125 }
126 byte[] payloadOfMerging = rt.getPayload();
127 List<HRegionInfo> mergingRegions =
128 HRegionInfo.parseDelimitedFrom(payloadOfMerging, 0, payloadOfMerging.length);
129 assert mergingRegions.size() == 3;
130 HRegionInfo a = mergingRegions.get(1);
131 HRegionInfo b = mergingRegions.get(2);
132 HRegionInfo hri_a = region_a.getRegionInfo();
133 HRegionInfo hri_b = region_b.getRegionInfo();
134 if (!(hri_a.equals(a) && hri_b.equals(b))) {
135 throw new IOException("Merging node " + node + " is for " + a + ", " + b
136 + ", not expected regions: " + hri_a + ", " + hri_b);
137 }
138
139 zdetails.setZnodeVersion(stat.getVersion());
140 return;
141 }
142 if (et != RS_ZK_REQUEST_REGION_MERGE) {
143 throw new IOException("Merging node " + node + " moved out of merging to " + et);
144 }
145 }
146
147 throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
148 } catch (Exception e) {
149 if (e instanceof InterruptedException) {
150 Thread.currentThread().interrupt();
151 }
152 throw new IOException("Failed getting MERGING znode on "
153 + mergedRegionInfo.getRegionNameAsString(), e);
154 }
155 }
156
157
158
159
160
161
162
163
164
165
166
167
168
169 @Override
170 public void startRegionMergeTransaction(final HRegionInfo region, final ServerName serverName,
171 final HRegionInfo a, final HRegionInfo b) throws IOException {
172 LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
173 + " in PENDING_MERGE state"));
174 byte[] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
175 RegionTransition rt =
176 RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(),
177 serverName, payload);
178 String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
179 try {
180 if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
181 throw new IOException("Failed create of ephemeral " + node);
182 }
183 } catch (KeeperException e) {
184 throw new IOException(e);
185 }
186 }
187
188
189
190
191
192
193
194 @Override
195 public void clean(final HRegionInfo hri) {
196 try {
197
198 if (!ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REQUEST_REGION_MERGE, manager
199 .getServer().getServerName())) {
200 ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REGION_MERGING, manager
201 .getServer().getServerName());
202 }
203 } catch (KeeperException.NoNodeException e) {
204 LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
205 } catch (KeeperException e) {
206 manager.getServer().abort("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
207 }
208 }
209
210
211
212
213 @Override
214 public void completeRegionMergeTransaction(final RegionServerServices services,
215 HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails rmd,
216 HRegion mergedRegion) throws IOException {
217 ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
218 if (manager.getServer() == null
219 || manager.getServer().getCoordinatedStateManager() == null) {
220 return;
221 }
222
223 try {
224 transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
225 manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
226
227 long startTime = EnvironmentEdgeManager.currentTime();
228 int spins = 0;
229
230
231
232 do {
233 if (spins % 10 == 0) {
234 LOG.debug("Still waiting on the master to process the merge for "
235 + mergedRegionInfo.getEncodedName() + ", waited "
236 + (EnvironmentEdgeManager.currentTime() - startTime) + "ms");
237 }
238 Thread.sleep(100);
239
240 transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
241 manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
242 spins++;
243 } while (zrmd.getZnodeVersion() != -1 && !manager.getServer().isStopped()
244 && !services.isStopping());
245 } catch (Exception e) {
246 if (e instanceof InterruptedException) {
247 Thread.currentThread().interrupt();
248 }
249 throw new IOException("Failed telling master about merge "
250 + mergedRegionInfo.getEncodedName(), e);
251 }
252
253
254
255 }
256
257
258
259
260 @Override
261 public void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
262 ServerName serverName, RegionMergeDetails rmd) throws IOException {
263 transitionMergingNode(merged, a, b, serverName, rmd, RS_ZK_REGION_MERGING,
264 RS_ZK_REGION_MERGING);
265 }
266
267
268
269
270 @Override
271 public void processRegionMergeRequest(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b,
272 ServerName sn, RegionMergeDetails rmd) throws IOException {
273 transitionMergingNode(p, hri_a, hri_b, sn, rmd, EventType.RS_ZK_REQUEST_REGION_MERGE,
274 EventType.RS_ZK_REGION_MERGING);
275 }
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313 private void transitionMergingNode(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
314 ServerName serverName, RegionMergeDetails rmd, final EventType beginState,
315 final EventType endState) throws IOException {
316 ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
317 byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
318 try {
319 zrmd.setZnodeVersion(ZKAssign.transitionNode(watcher, merged, serverName, beginState,
320 endState, zrmd.getZnodeVersion(), payload));
321 } catch (KeeperException e) {
322 throw new IOException(e);
323 }
324 }
325 }