View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional information regarding
4    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7    * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8    * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9    * for the specific language governing permissions and limitations under the License.
10   */
11  
12  package org.apache.hadoop.hbase.coordination;
13  
14  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
15  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
16  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
17  
18  import java.io.IOException;
19  import java.util.List;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.CoordinatedStateManager;
24  import org.apache.hadoop.hbase.HRegionInfo;
25  import org.apache.hadoop.hbase.RegionTransition;
26  import org.apache.hadoop.hbase.ServerName;
27  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
28  import org.apache.hadoop.hbase.executor.EventType;
29  import org.apache.hadoop.hbase.regionserver.HRegion;
30  import org.apache.hadoop.hbase.regionserver.Region;
31  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
32  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
33  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
34  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
35  import org.apache.zookeeper.KeeperException;
36  import org.apache.zookeeper.data.Stat;
37  
38  public class ZKSplitTransactionCoordination implements SplitTransactionCoordination {
39  
40    private CoordinatedStateManager coordinationManager;
41    private final ZooKeeperWatcher watcher;
42  
43    private static final Log LOG = LogFactory.getLog(ZKSplitTransactionCoordination.class);
44  
45    public ZKSplitTransactionCoordination(CoordinatedStateManager coordinationProvider,
46        ZooKeeperWatcher watcher) {
47      this.coordinationManager = coordinationProvider;
48      this.watcher = watcher;
49    }
50  
51    /**
52     * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region. Create it
53     * ephemeral in case regionserver dies mid-split.
54     * <p>
55     * Does not transition nodes from other states. If a node already exists for this region, an
56     * Exception will be thrown.
57     * @param parent region to be created as offline
58     * @param serverName server event originates from
59     * @param hri_a daughter region
60     * @param hri_b daughter region
61     * @throws IOException
62     */
63  
64    @Override
65    public void startSplitTransaction(HRegion parent, ServerName serverName, HRegionInfo hri_a,
66        HRegionInfo hri_b) throws IOException {
67  
68      HRegionInfo region = parent.getRegionInfo();
69      try {
70  
71        LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
72            + " in PENDING_SPLIT state"));
73        byte[] payload = HRegionInfo.toDelimitedByteArray(hri_a, hri_b);
74        RegionTransition rt =
75            RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_SPLIT,
76              region.getRegionName(), serverName, payload);
77        String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
78        if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
79          throw new IOException("Failed create of ephemeral " + node);
80        }
81  
82      } catch (KeeperException e) {
83        throw new IOException("Failed creating PENDING_SPLIT znode on "
84            + parent.getRegionInfo().getRegionNameAsString(), e);
85      }
86  
87    }
88  
89    /**
90     * Transitions an existing ephemeral node for the specified region which is currently in the begin
91     * state to be in the end state. Master cleans up the final SPLIT znode when it reads it (or if we
92     * crash, zk will clean it up).
93     * <p>
94     * Does not transition nodes from other states. If for some reason the node could not be
95     * transitioned, the method returns -1. If the transition is successful, the version of the node
96     * after transition is returned.
97     * <p>
98     * This method can fail and return false for three different reasons:
99     * <ul>
100    * <li>Node for this region does not exist</li>
101    * <li>Node for this region is not in the begin state</li>
102    * <li>After verifying the begin state, update fails because of wrong version (this should never
103    * actually happen since an RS only does this transition following a transition to the begin
104    * state. If two RS are conflicting, one would fail the original transition to the begin state and
105    * not this transition)</li>
106    * </ul>
107    * <p>
108    * Does not set any watches.
109    * <p>
110    * This method should only be used by a RegionServer when splitting a region.
111    * @param parent region to be transitioned to opened
112    * @param a Daughter a of split
113    * @param b Daughter b of split
114    * @param serverName server event originates from
115    * @param std split transaction details
116    * @param beginState the expected current state the znode should be
117    * @param endState the state to be transition to
118    * @return version of node after transition, -1 if unsuccessful transition
119    * @throws IOException
120    */
121 
122   private int transitionSplittingNode(HRegionInfo parent, HRegionInfo a, HRegionInfo b,
123       ServerName serverName, SplitTransactionDetails std, final EventType beginState,
124       final EventType endState) throws IOException {
125     ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
126     byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
127     try {
128       return ZKAssign.transitionNode(watcher, parent, serverName, beginState, endState,
129         zstd.getZnodeVersion(), payload);
130     } catch (KeeperException e) {
131       throw new IOException(
132           "Failed transition of splitting node " + parent.getRegionNameAsString(), e);
133     }
134   }
135 
136   /**
137    * Wait for the splitting node to be transitioned from pending_split to splitting by master.
138    * That's how we are sure master has processed the event and is good with us to move on. If we
139    * don't get any update, we periodically transition the node so that master gets the callback. If
140    * the node is removed or is not in pending_split state any more, we abort the split.
141    */
142   @Override
143   public void waitForSplitTransaction(final RegionServerServices services, Region parent,
144       HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails sptd) throws IOException {
145     ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) sptd;
146 
147     // After creating the split node, wait for master to transition it
148     // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
149     // knows about it and won't transition any region which is splitting.
150     try {
151       int spins = 0;
152       Stat stat = new Stat();
153       ServerName expectedServer = coordinationManager.getServer().getServerName();
154       String node = parent.getRegionInfo().getEncodedName();
155       while (!(coordinationManager.getServer().isStopped() || services.isStopping())) {
156         if (spins % 5 == 0) {
157           LOG.debug("Still waiting for master to process " + "the pending_split for " + node);
158           SplitTransactionDetails temp = getDefaultDetails();
159           transitionSplittingNode(parent.getRegionInfo(), hri_a, hri_b, expectedServer, temp,
160             RS_ZK_REQUEST_REGION_SPLIT, RS_ZK_REQUEST_REGION_SPLIT);
161         }
162         Thread.sleep(100);
163         spins++;
164         byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
165         if (data == null) {
166           throw new IOException("Data is null, splitting node " + node + " no longer exists");
167         }
168         RegionTransition rt = RegionTransition.parseFrom(data);
169         EventType et = rt.getEventType();
170         if (et == RS_ZK_REGION_SPLITTING) {
171           ServerName serverName = rt.getServerName();
172           if (!serverName.equals(expectedServer)) {
173             throw new IOException("Splitting node " + node + " is for " + serverName + ", not us "
174                 + expectedServer);
175           }
176           byte[] payloadOfSplitting = rt.getPayload();
177           List<HRegionInfo> splittingRegions =
178               HRegionInfo.parseDelimitedFrom(payloadOfSplitting, 0, payloadOfSplitting.length);
179           assert splittingRegions.size() == 2;
180           HRegionInfo a = splittingRegions.get(0);
181           HRegionInfo b = splittingRegions.get(1);
182           if (!(hri_a.equals(a) && hri_b.equals(b))) {
183             throw new IOException("Splitting node " + node + " is for " + a + ", " + b
184                 + ", not expected daughters: " + hri_a + ", " + hri_b);
185           }
186           // Master has processed it.
187           zstd.setZnodeVersion(stat.getVersion());
188           return;
189         }
190         if (et != RS_ZK_REQUEST_REGION_SPLIT) {
191           throw new IOException("Splitting node " + node + " moved out of splitting to " + et);
192         }
193       }
194       // Server is stopping/stopped
195       throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
196     } catch (Exception e) {
197       if (e instanceof InterruptedException) {
198         Thread.currentThread().interrupt();
199       }
200       throw new IOException("Failed getting SPLITTING znode on " +
201         parent.getRegionInfo().getRegionNameAsString(), e);
202     }
203   }
204 
205   /**
206    * Finish off split transaction, transition the zknode
207    * @param services Used to online/offline regions.
208    * @param a daughter region
209    * @param b daughter region
210    * @param std split transaction details
211    * @param parent
212    * @throws IOException If thrown, transaction failed. Call
213    *  {@link org.apache.hadoop.hbase.regionserver.SplitTransaction#rollback(
214    *  Server, RegionServerServices)}
215    */
216   @Override
217   public void completeSplitTransaction(final RegionServerServices services, Region a, Region b,
218       SplitTransactionDetails std, Region parent) throws IOException {
219     ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
220     // Tell master about split by updating zk. If we fail, abort.
221     if (coordinationManager.getServer() != null) {
222       try {
223         zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
224           b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
225           RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT));
226 
227         int spins = 0;
228         // Now wait for the master to process the split. We know it's done
229         // when the znode is deleted. The reason we keep tickling the znode is
230         // that it's possible for the master to miss an event.
231         do {
232           if (spins % 10 == 0) {
233             LOG.debug("Still waiting on the master to process the split for "
234                 + parent.getRegionInfo().getEncodedName());
235           }
236           Thread.sleep(100);
237           // When this returns -1 it means the znode doesn't exist
238           zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
239             b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
240             RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT));
241           spins++;
242         } while (zstd.getZnodeVersion() != -1 && !coordinationManager.getServer().isStopped()
243             && !services.isStopping());
244       } catch (Exception e) {
245         if (e instanceof InterruptedException) {
246           Thread.currentThread().interrupt();
247         }
248         throw new IOException("Failed telling master about split", e);
249       }
250     }
251 
252     // Leaving here, the splitdir with its dross will be in place but since the
253     // split was successful, just leave it; it'll be cleaned when parent is
254     // deleted and cleaned up.
255   }
256 
257   @Override
258   public void clean(final HRegionInfo hri) {
259     try {
260       // Only delete if its in expected state; could have been hijacked.
261       if (!ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(),
262         hri.getEncodedName(), RS_ZK_REQUEST_REGION_SPLIT, coordinationManager.getServer()
263             .getServerName())) {
264         ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(), hri.getEncodedName(),
265           RS_ZK_REGION_SPLITTING, coordinationManager.getServer().getServerName());
266       }
267     } catch (KeeperException.NoNodeException e) {
268       LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
269     } catch (KeeperException e) {
270       coordinationManager.getServer().abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
271     }
272   }
273 
274   /**
275    * ZK-based implementation. Has details about whether the state transition should be reflected in
276    * ZK, as well as expected version of znode.
277    */
278   public static class ZkSplitTransactionDetails implements
279       SplitTransactionCoordination.SplitTransactionDetails {
280     private int znodeVersion;
281 
282     public ZkSplitTransactionDetails() {
283     }
284 
285     /**
286      * @return znode current version
287      */
288     public int getZnodeVersion() {
289       return znodeVersion;
290     }
291 
292     /**
293      * @param znodeVersion znode new version
294      */
295     public void setZnodeVersion(int znodeVersion) {
296       this.znodeVersion = znodeVersion;
297     }
298   }
299 
300   @Override
301   public SplitTransactionDetails getDefaultDetails() {
302     ZkSplitTransactionDetails zstd = new ZkSplitTransactionDetails();
303     zstd.setZnodeVersion(-1);
304     return zstd;
305   }
306 
307   @Override
308   public int processTransition(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b, ServerName sn,
309       SplitTransactionDetails std) throws IOException {
310     return transitionSplittingNode(p, hri_a, hri_b, sn, std, RS_ZK_REQUEST_REGION_SPLIT,
311       RS_ZK_REGION_SPLITTING);
312 
313   }
314 }