1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coordination;
21
22 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
23 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
24 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
25
26 import java.io.IOException;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.CoordinatedStateManager;
32 import org.apache.hadoop.hbase.HRegionInfo;
33 import org.apache.hadoop.hbase.RegionTransition;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.executor.EventType;
36 import org.apache.hadoop.hbase.regionserver.HRegion;
37 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
38 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
40 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
41 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42 import org.apache.zookeeper.KeeperException;
43 import org.apache.zookeeper.data.Stat;
44
45 public class ZkRegionMergeCoordination implements RegionMergeCoordination {
46
47 private CoordinatedStateManager manager;
48 private final ZooKeeperWatcher watcher;
49
50 private static final Log LOG = LogFactory.getLog(ZkRegionMergeCoordination.class);
51
52 public ZkRegionMergeCoordination(CoordinatedStateManager manager,
53 ZooKeeperWatcher watcher) {
54 this.manager = manager;
55 this.watcher = watcher;
56 }
57
58
59
60
61
62 public static class ZkRegionMergeDetails implements RegionMergeCoordination.RegionMergeDetails {
63 private int znodeVersion;
64
65 public ZkRegionMergeDetails() {
66 }
67
68 public int getZnodeVersion() {
69 return znodeVersion;
70 }
71
72 public void setZnodeVersion(int znodeVersion) {
73 this.znodeVersion = znodeVersion;
74 }
75 }
76
77 @Override
78 public RegionMergeDetails getDefaultDetails() {
79 ZkRegionMergeDetails zstd = new ZkRegionMergeDetails();
80 zstd.setZnodeVersion(-1);
81 return zstd;
82 }
83
84
85
86
87
88
89
90
91
92
93
94 @Override
95 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
96 justification="Intended")
97 public void waitForRegionMergeTransaction(RegionServerServices services,
98 HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails details)
99 throws IOException {
100 try {
101 int spins = 0;
102 Stat stat = new Stat();
103 ServerName expectedServer = manager.getServer().getServerName();
104 String node = mergedRegionInfo.getEncodedName();
105 ZkRegionMergeDetails zdetails = (ZkRegionMergeDetails) details;
106 while (!(manager.getServer().isStopped() || services.isStopping())) {
107 if (spins % 5 == 0) {
108 LOG.debug("Still waiting for master to process " + "the pending_merge for " + node);
109 ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) getDefaultDetails();
110 transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(),
111 region_b.getRegionInfo(), expectedServer, zrmd, RS_ZK_REQUEST_REGION_MERGE,
112 RS_ZK_REQUEST_REGION_MERGE);
113 }
114 Thread.sleep(100);
115 spins++;
116 byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
117 if (data == null) {
118 throw new IOException("Data is null, merging node " + node + " no longer exists");
119 }
120 RegionTransition rt = RegionTransition.parseFrom(data);
121 EventType et = rt.getEventType();
122 if (et == RS_ZK_REGION_MERGING) {
123 ServerName serverName = rt.getServerName();
124 if (!serverName.equals(expectedServer)) {
125 throw new IOException("Merging node " + node + " is for " + serverName + ", not us "
126 + expectedServer);
127 }
128 byte[] payloadOfMerging = rt.getPayload();
129 List<HRegionInfo> mergingRegions =
130 HRegionInfo.parseDelimitedFrom(payloadOfMerging, 0, payloadOfMerging.length);
131 assert mergingRegions.size() == 3;
132 HRegionInfo a = mergingRegions.get(1);
133 HRegionInfo b = mergingRegions.get(2);
134 HRegionInfo hri_a = region_a.getRegionInfo();
135 HRegionInfo hri_b = region_b.getRegionInfo();
136 if (!(hri_a.equals(a) && hri_b.equals(b))) {
137 throw new IOException("Merging node " + node + " is for " + a + ", " + b
138 + ", not expected regions: " + hri_a + ", " + hri_b);
139 }
140
141 zdetails.setZnodeVersion(stat.getVersion());
142 return;
143 }
144 if (et != RS_ZK_REQUEST_REGION_MERGE) {
145 throw new IOException("Merging node " + node + " moved out of merging to " + et);
146 }
147 }
148
149 throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
150 } catch (Exception e) {
151 if (e instanceof InterruptedException) {
152 Thread.currentThread().interrupt();
153 }
154 throw new IOException("Failed getting MERGING znode on "
155 + mergedRegionInfo.getRegionNameAsString(), e);
156 }
157 }
158
159
160
161
162
163
164
165
166
167
168
169
170
171 @Override
172 public void startRegionMergeTransaction(final HRegionInfo region, final ServerName serverName,
173 final HRegionInfo a, final HRegionInfo b) throws IOException {
174 LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
175 + " in PENDING_MERGE state"));
176 byte[] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
177 RegionTransition rt =
178 RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(),
179 serverName, payload);
180 String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
181 try {
182 if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
183 throw new IOException("Failed create of ephemeral " + node);
184 }
185 } catch (KeeperException e) {
186 throw new IOException(e);
187 }
188 }
189
190
191
192
193
194
195
196 @Override
197 public void clean(final HRegionInfo hri) {
198 try {
199
200 if (!ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REQUEST_REGION_MERGE, manager
201 .getServer().getServerName())) {
202 ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REGION_MERGING, manager
203 .getServer().getServerName());
204 }
205 } catch (KeeperException.NoNodeException e) {
206 LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
207 } catch (KeeperException e) {
208 manager.getServer().abort("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
209 }
210 }
211
212
213
214
215 @Override
216 public void completeRegionMergeTransaction(final RegionServerServices services,
217 HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails rmd,
218 HRegion mergedRegion) throws IOException {
219 ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
220 if (manager.getServer() == null
221 || manager.getServer().getCoordinatedStateManager() == null) {
222 return;
223 }
224
225 try {
226 transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
227 manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
228
229 long startTime = EnvironmentEdgeManager.currentTime();
230 int spins = 0;
231
232
233
234 do {
235 if (spins % 10 == 0) {
236 LOG.debug("Still waiting on the master to process the merge for "
237 + mergedRegionInfo.getEncodedName() + ", waited "
238 + (EnvironmentEdgeManager.currentTime() - startTime) + "ms");
239 }
240 Thread.sleep(100);
241
242 transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
243 manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
244 spins++;
245 } while (zrmd.getZnodeVersion() != -1 && !manager.getServer().isStopped()
246 && !services.isStopping());
247 } catch (Exception e) {
248 if (e instanceof InterruptedException) {
249 Thread.currentThread().interrupt();
250 }
251 throw new IOException("Failed telling master about merge "
252 + mergedRegionInfo.getEncodedName(), e);
253 }
254
255
256
257 }
258
259
260
261
262 @Override
263 public void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
264 ServerName serverName, RegionMergeDetails rmd) throws IOException {
265 transitionMergingNode(merged, a, b, serverName, rmd, RS_ZK_REGION_MERGING,
266 RS_ZK_REGION_MERGING);
267 }
268
269
270
271
272 @Override
273 public void processRegionMergeRequest(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b,
274 ServerName sn, RegionMergeDetails rmd) throws IOException {
275 transitionMergingNode(p, hri_a, hri_b, sn, rmd, EventType.RS_ZK_REQUEST_REGION_MERGE,
276 EventType.RS_ZK_REGION_MERGING);
277 }
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315 private void transitionMergingNode(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
316 ServerName serverName, RegionMergeDetails rmd, final EventType beginState,
317 final EventType endState) throws IOException {
318 ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
319 byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
320 try {
321 zrmd.setZnodeVersion(ZKAssign.transitionNode(watcher, merged, serverName, beginState,
322 endState, zrmd.getZnodeVersion(), payload));
323 } catch (KeeperException e) {
324 throw new IOException(e);
325 }
326 }
327 }