View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional information regarding
4    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7    * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8    * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9    * for the specific language governing permissions and limitations under the License.
10   */
11  
12  package org.apache.hadoop.hbase.coordination;
13  
14  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
15  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
16  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
17  
18  import java.io.IOException;
19  import java.util.List;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.CoordinatedStateManager;
24  import org.apache.hadoop.hbase.HRegionInfo;
25  import org.apache.hadoop.hbase.RegionTransition;
26  import org.apache.hadoop.hbase.ServerName;
27  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
28  import org.apache.hadoop.hbase.executor.EventType;
29  import org.apache.hadoop.hbase.regionserver.HRegion;
30  import org.apache.hadoop.hbase.regionserver.Region;
31  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
32  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
33  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
34  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
35  import org.apache.zookeeper.KeeperException;
36  import org.apache.zookeeper.data.Stat;
37  
38  public class ZKSplitTransactionCoordination implements SplitTransactionCoordination {
39  
40    private CoordinatedStateManager coordinationManager;
41    private final ZooKeeperWatcher watcher;
42  
43    private static final Log LOG = LogFactory.getLog(ZKSplitTransactionCoordination.class);
44  
45    public ZKSplitTransactionCoordination(CoordinatedStateManager coordinationProvider,
46        ZooKeeperWatcher watcher) {
47      this.coordinationManager = coordinationProvider;
48      this.watcher = watcher;
49    }
50  
51    /**
52     * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region. Create it
53     * ephemeral in case regionserver dies mid-split.
54     * <p>
55     * Does not transition nodes from other states. If a node already exists for this region, an
56     * Exception will be thrown.
57     * @param parent region to be created as offline
58     * @param serverName server event originates from
59     * @param hri_a daughter region
60     * @param hri_b daughter region
61     * @throws IOException
62     */
63  
64    @Override
65    public void startSplitTransaction(HRegion parent, ServerName serverName, HRegionInfo hri_a,
66        HRegionInfo hri_b) throws IOException {
67  
68      HRegionInfo region = parent.getRegionInfo();
69      try {
70  
71        LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
72            + " in PENDING_SPLIT state"));
73        byte[] payload = HRegionInfo.toDelimitedByteArray(hri_a, hri_b);
74        RegionTransition rt =
75            RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_SPLIT,
76              region.getRegionName(), serverName, payload);
77        String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
78        if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
79          throw new IOException("Failed create of ephemeral " + node);
80        }
81  
82      } catch (KeeperException e) {
83        throw new IOException("Failed creating PENDING_SPLIT znode on "
84            + parent.getRegionInfo().getRegionNameAsString(), e);
85      }
86  
87    }
88  
89    /**
90     * Transitions an existing ephemeral node for the specified region which is currently in the begin
91     * state to be in the end state. Master cleans up the final SPLIT znode when it reads it (or if we
92     * crash, zk will clean it up).
93     * <p>
94     * Does not transition nodes from other states. If for some reason the node could not be
95     * transitioned, the method returns -1. If the transition is successful, the version of the node
96     * after transition is returned.
97     * <p>
98     * This method can fail and return false for three different reasons:
99     * <ul>
100    * <li>Node for this region does not exist</li>
101    * <li>Node for this region is not in the begin state</li>
102    * <li>After verifying the begin state, update fails because of wrong version (this should never
103    * actually happen since an RS only does this transition following a transition to the begin
104    * state. If two RS are conflicting, one would fail the original transition to the begin state and
105    * not this transition)</li>
106    * </ul>
107    * <p>
108    * Does not set any watches.
109    * <p>
110    * This method should only be used by a RegionServer when splitting a region.
111    * @param parent region to be transitioned to opened
112    * @param a Daughter a of split
113    * @param b Daughter b of split
114    * @param serverName server event originates from
115    * @param std split transaction details
116    * @param beginState the expected current state the znode should be
117    * @param endState the state to be transition to
118    * @return version of node after transition, -1 if unsuccessful transition
119    * @throws IOException
120    */
121 
122   private int transitionSplittingNode(HRegionInfo parent, HRegionInfo a, HRegionInfo b,
123       ServerName serverName, SplitTransactionDetails std, final EventType beginState,
124       final EventType endState) throws IOException {
125     ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
126     byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
127     try {
128       return ZKAssign.transitionNode(watcher, parent, serverName, beginState, endState,
129         zstd.getZnodeVersion(), payload);
130     } catch (KeeperException e) {
131       throw new IOException(
132           "Failed transition of splitting node " + parent.getRegionNameAsString(), e);
133     }
134   }
135 
136   /**
137    * Wait for the splitting node to be transitioned from pending_split to splitting by master.
138    * That's how we are sure master has processed the event and is good with us to move on. If we
139    * don't get any update, we periodically transition the node so that master gets the callback. If
140    * the node is removed or is not in pending_split state any more, we abort the split.
141    */
142   @Override
143   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
144     justification="Intended")
145   public void waitForSplitTransaction(final RegionServerServices services, Region parent,
146       HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails sptd) throws IOException {
147     ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) sptd;
148 
149     // After creating the split node, wait for master to transition it
150     // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
151     // knows about it and won't transition any region which is splitting.
152     try {
153       int spins = 0;
154       Stat stat = new Stat();
155       ServerName expectedServer = coordinationManager.getServer().getServerName();
156       String node = parent.getRegionInfo().getEncodedName();
157       while (!(coordinationManager.getServer().isStopped() || services.isStopping())) {
158         if (spins % 5 == 0) {
159           LOG.debug("Still waiting for master to process " + "the pending_split for " + node);
160           SplitTransactionDetails temp = getDefaultDetails();
161           transitionSplittingNode(parent.getRegionInfo(), hri_a, hri_b, expectedServer, temp,
162             RS_ZK_REQUEST_REGION_SPLIT, RS_ZK_REQUEST_REGION_SPLIT);
163         }
164         Thread.sleep(100);
165         spins++;
166         byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
167         if (data == null) {
168           throw new IOException("Data is null, splitting node " + node + " no longer exists");
169         }
170         RegionTransition rt = RegionTransition.parseFrom(data);
171         EventType et = rt.getEventType();
172         if (et == RS_ZK_REGION_SPLITTING) {
173           ServerName serverName = rt.getServerName();
174           if (!serverName.equals(expectedServer)) {
175             throw new IOException("Splitting node " + node + " is for " + serverName + ", not us "
176                 + expectedServer);
177           }
178           byte[] payloadOfSplitting = rt.getPayload();
179           List<HRegionInfo> splittingRegions =
180               HRegionInfo.parseDelimitedFrom(payloadOfSplitting, 0, payloadOfSplitting.length);
181           assert splittingRegions.size() == 2;
182           HRegionInfo a = splittingRegions.get(0);
183           HRegionInfo b = splittingRegions.get(1);
184           if (!(hri_a.equals(a) && hri_b.equals(b))) {
185             throw new IOException("Splitting node " + node + " is for " + a + ", " + b
186                 + ", not expected daughters: " + hri_a + ", " + hri_b);
187           }
188           // Master has processed it.
189           zstd.setZnodeVersion(stat.getVersion());
190           return;
191         }
192         if (et != RS_ZK_REQUEST_REGION_SPLIT) {
193           throw new IOException("Splitting node " + node + " moved out of splitting to " + et);
194         }
195       }
196       // Server is stopping/stopped
197       throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
198     } catch (Exception e) {
199       if (e instanceof InterruptedException) {
200         Thread.currentThread().interrupt();
201       }
202       throw new IOException("Failed getting SPLITTING znode on " +
203         parent.getRegionInfo().getRegionNameAsString(), e);
204     }
205   }
206 
207   /**
208    * Finish off split transaction, transition the zknode
209    * @param services Used to online/offline regions.
210    * @param a daughter region
211    * @param b daughter region
212    * @param std split transaction details
213    * @param parent
214    * @throws IOException If thrown, transaction failed. Call
215    *  {@link org.apache.hadoop.hbase.regionserver.SplitTransaction#rollback(
216    *  Server, RegionServerServices)}
217    */
218   @Override
219   public void completeSplitTransaction(final RegionServerServices services, Region a, Region b,
220       SplitTransactionDetails std, Region parent) throws IOException {
221     ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
222     // Tell master about split by updating zk. If we fail, abort.
223     if (coordinationManager.getServer() != null) {
224       try {
225         zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
226           b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
227           RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT));
228 
229         int spins = 0;
230         // Now wait for the master to process the split. We know it's done
231         // when the znode is deleted. The reason we keep tickling the znode is
232         // that it's possible for the master to miss an event.
233         do {
234           if (spins % 10 == 0) {
235             LOG.debug("Still waiting on the master to process the split for "
236                 + parent.getRegionInfo().getEncodedName());
237           }
238           Thread.sleep(100);
239           // When this returns -1 it means the znode doesn't exist
240           zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
241             b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
242             RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT));
243           spins++;
244         } while (zstd.getZnodeVersion() != -1 && !coordinationManager.getServer().isStopped()
245             && !services.isStopping());
246       } catch (Exception e) {
247         if (e instanceof InterruptedException) {
248           Thread.currentThread().interrupt();
249         }
250         throw new IOException("Failed telling master about split", e);
251       }
252     }
253 
254     // Leaving here, the splitdir with its dross will be in place but since the
255     // split was successful, just leave it; it'll be cleaned when parent is
256     // deleted and cleaned up.
257   }
258 
259   @Override
260   public void clean(final HRegionInfo hri) {
261     try {
262       // Only delete if its in expected state; could have been hijacked.
263       if (!ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(),
264         hri.getEncodedName(), RS_ZK_REQUEST_REGION_SPLIT, coordinationManager.getServer()
265             .getServerName())) {
266         ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(), hri.getEncodedName(),
267           RS_ZK_REGION_SPLITTING, coordinationManager.getServer().getServerName());
268       }
269     } catch (KeeperException.NoNodeException e) {
270       LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
271     } catch (KeeperException e) {
272       coordinationManager.getServer().abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
273     }
274   }
275 
276   /**
277    * ZK-based implementation. Has details about whether the state transition should be reflected in
278    * ZK, as well as expected version of znode.
279    */
280   public static class ZkSplitTransactionDetails implements
281       SplitTransactionCoordination.SplitTransactionDetails {
282     private int znodeVersion;
283 
284     public ZkSplitTransactionDetails() {
285     }
286 
287     /**
288      * @return znode current version
289      */
290     public int getZnodeVersion() {
291       return znodeVersion;
292     }
293 
294     /**
295      * @param znodeVersion znode new version
296      */
297     public void setZnodeVersion(int znodeVersion) {
298       this.znodeVersion = znodeVersion;
299     }
300   }
301 
302   @Override
303   public SplitTransactionDetails getDefaultDetails() {
304     ZkSplitTransactionDetails zstd = new ZkSplitTransactionDetails();
305     zstd.setZnodeVersion(-1);
306     return zstd;
307   }
308 
309   @Override
310   public int processTransition(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b, ServerName sn,
311       SplitTransactionDetails std) throws IOException {
312     return transitionSplittingNode(p, hri_a, hri_b, sn, std, RS_ZK_REQUEST_REGION_SPLIT,
313       RS_ZK_REGION_SPLITTING);
314 
315   }
316 }