View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coordination;
21  
22  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
23  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
24  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
25  
26  import java.io.IOException;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.CoordinatedStateManager;
32  import org.apache.hadoop.hbase.HRegionInfo;
33  import org.apache.hadoop.hbase.RegionTransition;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.executor.EventType;
36  import org.apache.hadoop.hbase.regionserver.HRegion;
37  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
38  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
40  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
41  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42  import org.apache.zookeeper.KeeperException;
43  import org.apache.zookeeper.data.Stat;
44  
45  public class ZkRegionMergeCoordination implements RegionMergeCoordination {
46  
47    private CoordinatedStateManager manager;
48    private final ZooKeeperWatcher watcher;
49  
50    private static final Log LOG = LogFactory.getLog(ZkRegionMergeCoordination.class);
51  
52    public ZkRegionMergeCoordination(CoordinatedStateManager manager,
53        ZooKeeperWatcher watcher) {
54      this.manager = manager;
55      this.watcher = watcher;
56    }
57  
58    /**
59     * ZK-based implementation. Has details about whether the state transition should be reflected in
60     * ZK, as well as expected version of znode.
61     */
62    public static class ZkRegionMergeDetails implements RegionMergeCoordination.RegionMergeDetails {
63      private int znodeVersion;
64  
65      public ZkRegionMergeDetails() {
66      }
67  
68      public int getZnodeVersion() {
69        return znodeVersion;
70      }
71  
72      public void setZnodeVersion(int znodeVersion) {
73        this.znodeVersion = znodeVersion;
74      }
75    }
76  
77    @Override
78    public RegionMergeDetails getDefaultDetails() {
79      ZkRegionMergeDetails zstd = new ZkRegionMergeDetails();
80      zstd.setZnodeVersion(-1);
81      return zstd;
82    }
83  
84    /**
85     * Wait for the merging node to be transitioned from pending_merge
86     * to merging by master. That's how we are sure master has processed
87     * the event and is good with us to move on. If we don't get any update,
88     * we periodically transition the node so that master gets the callback.
89     * If the node is removed or is not in pending_merge state any more,
90     * we abort the merge.
91     * @throws IOException
92     */
93  
94    @Override
95    public void waitForRegionMergeTransaction(RegionServerServices services,
96        HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails details)
97        throws IOException {
98      try {
99        int spins = 0;
100       Stat stat = new Stat();
101       ServerName expectedServer = manager.getServer().getServerName();
102       String node = mergedRegionInfo.getEncodedName();
103       ZkRegionMergeDetails zdetails = (ZkRegionMergeDetails) details;
104       while (!(manager.getServer().isStopped() || services.isStopping())) {
105         if (spins % 5 == 0) {
106           LOG.debug("Still waiting for master to process " + "the pending_merge for " + node);
107           ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) getDefaultDetails();
108           transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(),
109             region_b.getRegionInfo(), expectedServer, zrmd, RS_ZK_REQUEST_REGION_MERGE,
110             RS_ZK_REQUEST_REGION_MERGE);
111         }
112         Thread.sleep(100);
113         spins++;
114         byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
115         if (data == null) {
116           throw new IOException("Data is null, merging node " + node + " no longer exists");
117         }
118         RegionTransition rt = RegionTransition.parseFrom(data);
119         EventType et = rt.getEventType();
120         if (et == RS_ZK_REGION_MERGING) {
121           ServerName serverName = rt.getServerName();
122           if (!serverName.equals(expectedServer)) {
123             throw new IOException("Merging node " + node + " is for " + serverName + ", not us "
124                 + expectedServer);
125           }
126           byte[] payloadOfMerging = rt.getPayload();
127           List<HRegionInfo> mergingRegions =
128               HRegionInfo.parseDelimitedFrom(payloadOfMerging, 0, payloadOfMerging.length);
129           assert mergingRegions.size() == 3;
130           HRegionInfo a = mergingRegions.get(1);
131           HRegionInfo b = mergingRegions.get(2);
132           HRegionInfo hri_a = region_a.getRegionInfo();
133           HRegionInfo hri_b = region_b.getRegionInfo();
134           if (!(hri_a.equals(a) && hri_b.equals(b))) {
135             throw new IOException("Merging node " + node + " is for " + a + ", " + b
136                 + ", not expected regions: " + hri_a + ", " + hri_b);
137           }
138           // Master has processed it.
139           zdetails.setZnodeVersion(stat.getVersion());
140           return;
141         }
142         if (et != RS_ZK_REQUEST_REGION_MERGE) {
143           throw new IOException("Merging node " + node + " moved out of merging to " + et);
144         }
145       }
146       // Server is stopping/stopped
147       throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
148     } catch (Exception e) {
149       if (e instanceof InterruptedException) {
150         Thread.currentThread().interrupt();
151       }
152       throw new IOException("Failed getting MERGING znode on "
153           + mergedRegionInfo.getRegionNameAsString(), e);
154     }
155   }
156 
157   /**
158    * Creates a new ephemeral node in the PENDING_MERGE state for the merged region.
159    * Create it ephemeral in case regionserver dies mid-merge.
160    *
161    * <p>
162    * Does not transition nodes from other states. If a node already exists for
163    * this region, a {@link org.apache.zookeeper.KeeperException.NodeExistsException} will be thrown.
164    *
165    * @param region region to be created as offline
166    * @param serverName server event originates from
167    * @throws IOException
168    */
169   @Override
170   public void startRegionMergeTransaction(final HRegionInfo region, final ServerName serverName,
171       final HRegionInfo a, final HRegionInfo b) throws IOException {
172     LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
173         + " in PENDING_MERGE state"));
174     byte[] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
175     RegionTransition rt =
176         RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(),
177           serverName, payload);
178     String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
179     try {
180       if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
181         throw new IOException("Failed create of ephemeral " + node);
182       }
183     } catch (KeeperException e) {
184       throw new IOException(e);
185     }
186   }
187 
188   /*
189    * (non-Javadoc)
190    * @see
191    * org.apache.hadoop.hbase.regionserver.coordination.RegionMergeCoordination#clean(org.apache.hadoop
192    * .hbase.Server, org.apache.hadoop.hbase.HRegionInfo)
193    */
194   @Override
195   public void clean(final HRegionInfo hri) {
196     try {
197       // Only delete if its in expected state; could have been hijacked.
198       if (!ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REQUEST_REGION_MERGE, manager
199           .getServer().getServerName())) {
200         ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REGION_MERGING, manager
201             .getServer().getServerName());
202       }
203     } catch (KeeperException.NoNodeException e) {
204       LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
205     } catch (KeeperException e) {
206       manager.getServer().abort("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
207     }
208   }
209 
210   /*
211    * ZooKeeper implementation of finishRegionMergeTransaction
212    */
213   @Override
214   public void completeRegionMergeTransaction(final RegionServerServices services,
215       HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails rmd,
216       HRegion mergedRegion) throws IOException {
217     ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
218     if (manager.getServer() == null
219         || manager.getServer().getCoordinatedStateManager() == null) {
220       return;
221     }
222     // Tell master about merge by updating zk. If we fail, abort.
223     try {
224       transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
225         manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
226 
227       long startTime = EnvironmentEdgeManager.currentTime();
228       int spins = 0;
229       // Now wait for the master to process the merge. We know it's done
230       // when the znode is deleted. The reason we keep tickling the znode is
231       // that it's possible for the master to miss an event.
232       do {
233         if (spins % 10 == 0) {
234           LOG.debug("Still waiting on the master to process the merge for "
235               + mergedRegionInfo.getEncodedName() + ", waited "
236               + (EnvironmentEdgeManager.currentTime() - startTime) + "ms");
237         }
238         Thread.sleep(100);
239         // When this returns -1 it means the znode doesn't exist
240         transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
241           manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
242         spins++;
243       } while (zrmd.getZnodeVersion() != -1 && !manager.getServer().isStopped()
244           && !services.isStopping());
245     } catch (Exception e) {
246       if (e instanceof InterruptedException) {
247         Thread.currentThread().interrupt();
248       }
249       throw new IOException("Failed telling master about merge "
250           + mergedRegionInfo.getEncodedName(), e);
251     }
252     // Leaving here, the mergedir with its dross will be in place but since the
253     // merge was successful, just leave it; it'll be cleaned when region_a is
254     // cleaned up by CatalogJanitor on master
255   }
256 
257   /*
258    * Zookeeper implementation of region merge confirmation
259    */
260   @Override
261   public void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
262       ServerName serverName, RegionMergeDetails rmd) throws IOException {
263     transitionMergingNode(merged, a, b, serverName, rmd, RS_ZK_REGION_MERGING,
264       RS_ZK_REGION_MERGING);
265   }
266 
267   /*
268    * Zookeeper implementation of region merge processing
269    */
270   @Override
271   public void processRegionMergeRequest(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b,
272       ServerName sn, RegionMergeDetails rmd) throws IOException {
273     transitionMergingNode(p, hri_a, hri_b, sn, rmd, EventType.RS_ZK_REQUEST_REGION_MERGE,
274       EventType.RS_ZK_REGION_MERGING);
275   }
276 
277   /**
278    * Transitions an existing ephemeral node for the specified region which is
279    * currently in the begin state to be in the end state. Master cleans up the
280    * final MERGE znode when it reads it (or if we crash, zk will clean it up).
281    *
282    * <p>
283    * Does not transition nodes from other states. If for some reason the node
284    * could not be transitioned, the method returns -1. If the transition is
285    * successful, the version of the node after transition is updated in details.
286    *
287    * <p>
288    * This method can fail and return false for three different reasons:
289    * <ul>
290    * <li>Node for this region does not exist</li>
291    * <li>Node for this region is not in the begin state</li>
292    * <li>After verifying the begin state, update fails because of wrong version
293    * (this should never actually happen since an RS only does this transition
294    * following a transition to the begin state. If two RS are conflicting, one would
295    * fail the original transition to the begin state and not this transition)</li>
296    * </ul>
297    *
298    * <p>
299    * Does not set any watches.
300    *
301    * <p>
302    * This method should only be used by a RegionServer when merging two regions.
303    *
304    * @param merged region to be transitioned to opened
305    * @param a merging region A
306    * @param b merging region B
307    * @param serverName server event originates from
308    * @param rmd region merge details
309    * @param beginState the expected current state the node should be
310    * @param endState the state to be transition to
311    * @throws IOException
312    */
313   private void transitionMergingNode(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
314       ServerName serverName, RegionMergeDetails rmd, final EventType beginState,
315       final EventType endState) throws IOException {
316     ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
317     byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
318     try {
319       zrmd.setZnodeVersion(ZKAssign.transitionNode(watcher, merged, serverName, beginState,
320         endState, zrmd.getZnodeVersion(), payload));
321     } catch (KeeperException e) {
322       throw new IOException(e);
323     }
324   }
325 }