View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coordination;
21  
22  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
23  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
24  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
25  
26  import java.io.IOException;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.CoordinatedStateManager;
32  import org.apache.hadoop.hbase.HRegionInfo;
33  import org.apache.hadoop.hbase.RegionTransition;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.executor.EventType;
36  import org.apache.hadoop.hbase.regionserver.HRegion;
37  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
38  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
40  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
41  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42  import org.apache.zookeeper.KeeperException;
43  import org.apache.zookeeper.data.Stat;
44  
45  public class ZkRegionMergeCoordination implements RegionMergeCoordination {
46  
47    private CoordinatedStateManager manager;
48    private final ZooKeeperWatcher watcher;
49  
50    private static final Log LOG = LogFactory.getLog(ZkRegionMergeCoordination.class);
51  
52    public ZkRegionMergeCoordination(CoordinatedStateManager manager,
53        ZooKeeperWatcher watcher) {
54      this.manager = manager;
55      this.watcher = watcher;
56    }
57  
58    /**
59     * ZK-based implementation. Has details about whether the state transition should be reflected in
60     * ZK, as well as expected version of znode.
61     */
62    public static class ZkRegionMergeDetails implements RegionMergeCoordination.RegionMergeDetails {
63      private int znodeVersion;
64  
65      public ZkRegionMergeDetails() {
66      }
67  
68      public int getZnodeVersion() {
69        return znodeVersion;
70      }
71  
72      public void setZnodeVersion(int znodeVersion) {
73        this.znodeVersion = znodeVersion;
74      }
75    }
76  
77    @Override
78    public RegionMergeDetails getDefaultDetails() {
79      ZkRegionMergeDetails zstd = new ZkRegionMergeDetails();
80      zstd.setZnodeVersion(-1);
81      return zstd;
82    }
83  
84    /**
85     * Wait for the merging node to be transitioned from pending_merge
86     * to merging by master. That's how we are sure master has processed
87     * the event and is good with us to move on. If we don't get any update,
88     * we periodically transition the node so that master gets the callback.
89     * If the node is removed or is not in pending_merge state any more,
90     * we abort the merge.
91     * @throws IOException
92     */
93  
94    @Override
95    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
96      justification="Intended")
97    public void waitForRegionMergeTransaction(RegionServerServices services,
98        HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails details)
99        throws IOException {
100     try {
101       int spins = 0;
102       Stat stat = new Stat();
103       ServerName expectedServer = manager.getServer().getServerName();
104       String node = mergedRegionInfo.getEncodedName();
105       ZkRegionMergeDetails zdetails = (ZkRegionMergeDetails) details;
106       while (!(manager.getServer().isStopped() || services.isStopping())) {
107         if (spins % 5 == 0) {
108           LOG.debug("Still waiting for master to process " + "the pending_merge for " + node);
109           ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) getDefaultDetails();
110           transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(),
111             region_b.getRegionInfo(), expectedServer, zrmd, RS_ZK_REQUEST_REGION_MERGE,
112             RS_ZK_REQUEST_REGION_MERGE);
113         }
114         Thread.sleep(100);
115         spins++;
116         byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
117         if (data == null) {
118           throw new IOException("Data is null, merging node " + node + " no longer exists");
119         }
120         RegionTransition rt = RegionTransition.parseFrom(data);
121         EventType et = rt.getEventType();
122         if (et == RS_ZK_REGION_MERGING) {
123           ServerName serverName = rt.getServerName();
124           if (!serverName.equals(expectedServer)) {
125             throw new IOException("Merging node " + node + " is for " + serverName + ", not us "
126                 + expectedServer);
127           }
128           byte[] payloadOfMerging = rt.getPayload();
129           List<HRegionInfo> mergingRegions =
130               HRegionInfo.parseDelimitedFrom(payloadOfMerging, 0, payloadOfMerging.length);
131           assert mergingRegions.size() == 3;
132           HRegionInfo a = mergingRegions.get(1);
133           HRegionInfo b = mergingRegions.get(2);
134           HRegionInfo hri_a = region_a.getRegionInfo();
135           HRegionInfo hri_b = region_b.getRegionInfo();
136           if (!(hri_a.equals(a) && hri_b.equals(b))) {
137             throw new IOException("Merging node " + node + " is for " + a + ", " + b
138                 + ", not expected regions: " + hri_a + ", " + hri_b);
139           }
140           // Master has processed it.
141           zdetails.setZnodeVersion(stat.getVersion());
142           return;
143         }
144         if (et != RS_ZK_REQUEST_REGION_MERGE) {
145           throw new IOException("Merging node " + node + " moved out of merging to " + et);
146         }
147       }
148       // Server is stopping/stopped
149       throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
150     } catch (Exception e) {
151       if (e instanceof InterruptedException) {
152         Thread.currentThread().interrupt();
153       }
154       throw new IOException("Failed getting MERGING znode on "
155           + mergedRegionInfo.getRegionNameAsString(), e);
156     }
157   }
158 
159   /**
160    * Creates a new ephemeral node in the PENDING_MERGE state for the merged region.
161    * Create it ephemeral in case regionserver dies mid-merge.
162    *
163    * <p>
164    * Does not transition nodes from other states. If a node already exists for
165    * this region, a {@link org.apache.zookeeper.KeeperException.NodeExistsException} will be thrown.
166    *
167    * @param region region to be created as offline
168    * @param serverName server event originates from
169    * @throws IOException
170    */
171   @Override
172   public void startRegionMergeTransaction(final HRegionInfo region, final ServerName serverName,
173       final HRegionInfo a, final HRegionInfo b) throws IOException {
174     LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
175         + " in PENDING_MERGE state"));
176     byte[] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
177     RegionTransition rt =
178         RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(),
179           serverName, payload);
180     String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
181     try {
182       if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
183         throw new IOException("Failed create of ephemeral " + node);
184       }
185     } catch (KeeperException e) {
186       throw new IOException(e);
187     }
188   }
189 
190   /*
191    * (non-Javadoc)
192    * @see
193    * org.apache.hadoop.hbase.regionserver.coordination.RegionMergeCoordination#clean(org.apache.hadoop
194    * .hbase.Server, org.apache.hadoop.hbase.HRegionInfo)
195    */
196   @Override
197   public void clean(final HRegionInfo hri) {
198     try {
199       // Only delete if its in expected state; could have been hijacked.
200       if (!ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REQUEST_REGION_MERGE, manager
201           .getServer().getServerName())) {
202         ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REGION_MERGING, manager
203             .getServer().getServerName());
204       }
205     } catch (KeeperException.NoNodeException e) {
206       LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
207     } catch (KeeperException e) {
208       manager.getServer().abort("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
209     }
210   }
211 
212   /*
213    * ZooKeeper implementation of finishRegionMergeTransaction
214    */
215   @Override
216   public void completeRegionMergeTransaction(final RegionServerServices services,
217       HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails rmd,
218       HRegion mergedRegion) throws IOException {
219     ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
220     if (manager.getServer() == null
221         || manager.getServer().getCoordinatedStateManager() == null) {
222       return;
223     }
224     // Tell master about merge by updating zk. If we fail, abort.
225     try {
226       transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
227         manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
228 
229       long startTime = EnvironmentEdgeManager.currentTime();
230       int spins = 0;
231       // Now wait for the master to process the merge. We know it's done
232       // when the znode is deleted. The reason we keep tickling the znode is
233       // that it's possible for the master to miss an event.
234       do {
235         if (spins % 10 == 0) {
236           LOG.debug("Still waiting on the master to process the merge for "
237               + mergedRegionInfo.getEncodedName() + ", waited "
238               + (EnvironmentEdgeManager.currentTime() - startTime) + "ms");
239         }
240         Thread.sleep(100);
241         // When this returns -1 it means the znode doesn't exist
242         transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
243           manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
244         spins++;
245       } while (zrmd.getZnodeVersion() != -1 && !manager.getServer().isStopped()
246           && !services.isStopping());
247     } catch (Exception e) {
248       if (e instanceof InterruptedException) {
249         Thread.currentThread().interrupt();
250       }
251       throw new IOException("Failed telling master about merge "
252           + mergedRegionInfo.getEncodedName(), e);
253     }
254     // Leaving here, the mergedir with its dross will be in place but since the
255     // merge was successful, just leave it; it'll be cleaned when region_a is
256     // cleaned up by CatalogJanitor on master
257   }
258 
259   /*
260    * Zookeeper implementation of region merge confirmation
261    */
262   @Override
263   public void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
264       ServerName serverName, RegionMergeDetails rmd) throws IOException {
265     transitionMergingNode(merged, a, b, serverName, rmd, RS_ZK_REGION_MERGING,
266       RS_ZK_REGION_MERGING);
267   }
268 
269   /*
270    * Zookeeper implementation of region merge processing
271    */
272   @Override
273   public void processRegionMergeRequest(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b,
274       ServerName sn, RegionMergeDetails rmd) throws IOException {
275     transitionMergingNode(p, hri_a, hri_b, sn, rmd, EventType.RS_ZK_REQUEST_REGION_MERGE,
276       EventType.RS_ZK_REGION_MERGING);
277   }
278 
279   /**
280    * Transitions an existing ephemeral node for the specified region which is
281    * currently in the begin state to be in the end state. Master cleans up the
282    * final MERGE znode when it reads it (or if we crash, zk will clean it up).
283    *
284    * <p>
285    * Does not transition nodes from other states. If for some reason the node
286    * could not be transitioned, the method returns -1. If the transition is
287    * successful, the version of the node after transition is updated in details.
288    *
289    * <p>
290    * This method can fail and return false for three different reasons:
291    * <ul>
292    * <li>Node for this region does not exist</li>
293    * <li>Node for this region is not in the begin state</li>
294    * <li>After verifying the begin state, update fails because of wrong version
295    * (this should never actually happen since an RS only does this transition
296    * following a transition to the begin state. If two RS are conflicting, one would
297    * fail the original transition to the begin state and not this transition)</li>
298    * </ul>
299    *
300    * <p>
301    * Does not set any watches.
302    *
303    * <p>
304    * This method should only be used by a RegionServer when merging two regions.
305    *
306    * @param merged region to be transitioned to opened
307    * @param a merging region A
308    * @param b merging region B
309    * @param serverName server event originates from
310    * @param rmd region merge details
311    * @param beginState the expected current state the node should be
312    * @param endState the state to be transition to
313    * @throws IOException
314    */
315   private void transitionMergingNode(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
316       ServerName serverName, RegionMergeDetails rmd, final EventType beginState,
317       final EventType endState) throws IOException {
318     ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
319     byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
320     try {
321       zrmd.setZnodeVersion(ZKAssign.transitionNode(watcher, merged, serverName, beginState,
322         endState, zrmd.getZnodeVersion(), payload));
323     } catch (KeeperException e) {
324       throw new IOException(e);
325     }
326   }
327 }