View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.util.Arrays;
22  import java.util.List;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.classification.InterfaceStability;
28  import org.apache.hadoop.hbase.errorhandling.ForeignException;
29  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
30  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32  import org.apache.zookeeper.KeeperException;
33  
34  import com.google.protobuf.InvalidProtocolBufferException;
35  
36  /**
37   * ZooKeeper based controller for a procedure member.
38   * <p>
39   * There can only be one {@link ZKProcedureMemberRpcs} per procedure type per member,
40   * since each procedure type is bound to a single set of znodes. You can have multiple
41   * {@link ZKProcedureMemberRpcs} on the same server, each serving a different member
42   * name, but each individual rpcs is still bound to a single member name (and since they are
43   * used to determine global progress, its important to not get this wrong).
44   * <p>
45   * To make this slightly more confusing, you can run multiple, concurrent procedures at the same
46   * time (as long as they have different types), from the same controller, but the same node name
47   * must be used for each procedure (though there is no conflict between the two procedure as long
48   * as they have distinct names).
49   * <p>
50   * There is no real error recovery with this mechanism currently -- if any the coordinator fails,
51   * its re-initialization will delete the znodes and require all in progress subprocedures to start
52   * anew.
53   */
54  @InterfaceAudience.Public
55  @InterfaceStability.Evolving
56  public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
57    private static final Log LOG = LogFactory.getLog(ZKProcedureMemberRpcs.class);
58  
59    private final ZKProcedureUtil zkController;
60  
61    protected ProcedureMember member;
62    private String memberName;
63  
64    /**
65     * Must call {@link #start(String, ProcedureMember)} before this can be used.
66     * @param watcher {@link ZooKeeperWatcher} to be owned by <tt>this</tt>. Closed via
67     *          {@link #close()}.
68     * @param procType name of the znode describing the procedure type
69     * @throws KeeperException if we can't reach zookeeper
70     */
71    public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
72        throws KeeperException {
73      this.zkController = new ZKProcedureUtil(watcher, procType) {
74        @Override
75        public void nodeCreated(String path) {
76          if (!isInProcedurePath(path)) {
77            return;
78          }
79  
80          LOG.info("Received created event:" + path);
81          // if it is a simple start/end/abort then we just rewatch the node
82          if (isAcquiredNode(path)) {
83            waitForNewProcedures();
84            return;
85          } else if (isAbortNode(path)) {
86            watchForAbortedProcedures();
87            return;
88          }
89          String parent = ZKUtil.getParent(path);
90          // if its the end barrier, the procedure can be completed
91          if (isReachedNode(parent)) {
92            receivedReachedGlobalBarrier(path);
93            return;
94          } else if (isAbortNode(parent)) {
95            abort(path);
96            return;
97          } else if (isAcquiredNode(parent)) {
98            startNewSubprocedure(path);
99          } else {
100           LOG.debug("Ignoring created notification for node:" + path);
101         }
102       }
103 
104       @Override
105       public void nodeChildrenChanged(String path) {
106         if (path.equals(this.acquiredZnode)) {
107           LOG.info("Received procedure start children changed event: " + path);
108           waitForNewProcedures();
109         } else if (path.equals(this.abortZnode)) {
110           LOG.info("Received procedure abort children changed event: " + path);
111           watchForAbortedProcedures();
112         }
113       }
114     };
115   }
116 
117   public ZKProcedureUtil getZkController() {
118     return zkController;
119   }
120 
121   @Override
122   public String getMemberName() {
123     return memberName;
124   }
125 
126   /**
127    * Pass along the procedure global barrier notification to any listeners
128    * @param path full znode path that cause the notification
129    */
130   private void receivedReachedGlobalBarrier(String path) {
131     LOG.debug("Recieved reached global barrier:" + path);
132     String procName = ZKUtil.getNodeName(path);
133     this.member.receivedReachedGlobalBarrier(procName);
134   }
135 
136   private void watchForAbortedProcedures() {
137     LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
138     try {
139       // this is the list of the currently aborted procedues
140       for (String node : ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
141         zkController.getAbortZnode())) {
142         String abortNode = ZKUtil.joinZNode(zkController.getAbortZnode(), node);
143         abort(abortNode);
144       }
145     } catch (KeeperException e) {
146       member.controllerConnectionFailure("Failed to list children for abort node:"
147           + zkController.getAbortZnode(), new IOException(e));
148     }
149   }
150 
151   private void waitForNewProcedures() {
152     // watch for new procedues that we need to start subprocedures for
153     LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'");
154     List<String> runningProcedures = null;
155     try {
156       runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
157         zkController.getAcquiredBarrier());
158       if (runningProcedures == null) {
159         LOG.debug("No running procedures.");
160         return;
161       }
162     } catch (KeeperException e) {
163       member.controllerConnectionFailure("General failure when watching for new procedures",
164         new IOException(e));
165     }
166     if (runningProcedures == null) {
167       LOG.debug("No running procedures.");
168       return;
169     }
170     for (String procName : runningProcedures) {
171       // then read in the procedure information
172       String path = ZKUtil.joinZNode(zkController.getAcquiredBarrier(), procName);
173       startNewSubprocedure(path);
174     }
175   }
176 
177   /**
178    * Kick off a new sub-procedure on the listener with the data stored in the passed znode.
179    * <p>
180    * Will attempt to create the same procedure multiple times if an procedure znode with the same
181    * name is created. It is left up the coordinator to ensure this doesn't occur.
182    * @param path full path to the znode for the procedure to start
183    */
184   private synchronized void startNewSubprocedure(String path) {
185     LOG.debug("Found procedure znode: " + path);
186     String opName = ZKUtil.getNodeName(path);
187     // start watching for an abort notification for the procedure
188     String abortZNode = zkController.getAbortZNode(opName);
189     try {
190       if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) {
191         LOG.debug("Not starting:" + opName + " because we already have an abort notification.");
192         return;
193       }
194     } catch (KeeperException e) {
195       member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
196           + ") for procedure :" + opName, new IOException(e));
197       return;
198     }
199 
200     // get the data for the procedure
201     Subprocedure subproc = null;
202     try {
203       byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
204       LOG.debug("start proc data length is " + data.length);
205       if (!ProtobufUtil.isPBMagicPrefix(data)) {
206         String msg = "Data in for starting procuedure " + opName + " is illegally formatted. "
207             + "Killing the procedure.";
208         LOG.error(msg);
209         throw new IllegalArgumentException(msg);
210       }
211       data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
212       LOG.debug("Found data for znode:" + path);
213       subproc = member.createSubprocedure(opName, data);
214       member.submitSubprocedure(subproc);
215     } catch (IllegalArgumentException iae ) {
216       LOG.error("Illegal argument exception", iae);
217       sendMemberAborted(subproc, new ForeignException(getMemberName(), iae));
218     } catch (IllegalStateException ise) {
219       LOG.error("Illegal state exception ", ise);
220       sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
221     } catch (KeeperException e) {
222       member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
223         new IOException(e));
224     }
225   }
226 
227   /**
228    * This attempts to create an acquired state znode for the procedure (snapshot name).
229    *
230    * It then looks for the reached znode to trigger in-barrier execution.  If not present we
231    * have a watcher, if present then trigger the in-barrier action.
232    */
233   @Override
234   public void sendMemberAcquired(Subprocedure sub) throws IOException {
235     String procName = sub.getName();
236     try {
237       LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
238           + ") in zk");
239       String acquiredZNode = ZKUtil.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(
240         zkController, procName), memberName);
241       ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
242 
243       // watch for the complete node for this snapshot
244       String reachedBarrier = zkController.getReachedBarrierNode(procName);
245       LOG.debug("Watch for global barrier reached:" + reachedBarrier);
246       if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
247         receivedReachedGlobalBarrier(reachedBarrier);
248       }
249     } catch (KeeperException e) {
250       member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
251           + procName + " and member: " + memberName, new IOException(e));
252     }
253   }
254 
255   /**
256    * This acts as the ack for a completed snapshot
257    */
258   @Override
259   public void sendMemberCompleted(Subprocedure sub) throws IOException {
260     String procName = sub.getName();
261     LOG.debug("Marking procedure  '" + procName + "' completed for member '" + memberName
262         + "' in zk");
263     String joinPath = ZKUtil.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
264     try {
265       ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath);
266     } catch (KeeperException e) {
267       member.controllerConnectionFailure("Failed to post zk node:" + joinPath
268           + " to join procedure barrier.", new IOException(e));
269     }
270   }
271 
272   /**
273    * This should be called by the member and should write a serialized root cause exception as
274    * to the abort znode.
275    */
276   @Override
277   public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
278     if (sub == null) {
279       LOG.error("Failed due to null subprocedure", ee);
280       return;
281     }
282     String procName = sub.getName();
283     LOG.debug("Aborting procedure (" + procName + ") in zk");
284     String procAbortZNode = zkController.getAbortZNode(procName);
285     try {
286       String source = (ee.getSource() == null) ? memberName: ee.getSource();
287       byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
288       ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
289       LOG.debug("Finished creating abort znode:" + procAbortZNode);
290     } catch (KeeperException e) {
291       // possible that we get this error for the procedure if we already reset the zk state, but in
292       // that case we should still get an error for that procedure anyways
293       zkController.logZKTree(zkController.getBaseZnode());
294       member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
295           + " to abort procedure", new IOException(e));
296     }
297   }
298 
299   /**
300    * Pass along the found abort notification to the listener
301    * @param abortZNode full znode path to the failed procedure information
302    */
303   protected void abort(String abortZNode) {
304     LOG.debug("Aborting procedure member for znode " + abortZNode);
305     String opName = ZKUtil.getNodeName(abortZNode);
306     try {
307       byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode);
308 
309       // figure out the data we need to pass
310       ForeignException ee;
311       try {
312         if (!ProtobufUtil.isPBMagicPrefix(data)) {
313           String msg = "Illegally formatted data in abort node for proc " + opName
314               + ".  Killing the procedure.";
315           LOG.error(msg);
316           // we got a remote exception, but we can't describe it so just return exn from here
317           ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
318         } else {
319           data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
320           ee = ForeignException.deserialize(data);
321         }
322       } catch (InvalidProtocolBufferException e) {
323         LOG.warn("Got an error notification for op:" + opName
324             + " but we can't read the information. Killing the procedure.");
325         // we got a remote exception, but we can't describe it so just return exn from here
326         ee = new ForeignException(getMemberName(), e);
327       }
328 
329       this.member.receiveAbortProcedure(opName, ee);
330     } catch (KeeperException e) {
331       member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
332           + zkController.getAbortZnode(), new IOException(e));
333     }
334   }
335 
336   public void start(final String memberName, final ProcedureMember listener) {
337     LOG.debug("Starting procedure member '" + memberName + "'");
338     this.member = listener;
339     this.memberName = memberName;
340     watchForAbortedProcedures();
341     waitForNewProcedures();
342   }
343 
344   @Override
345   public void close() throws IOException {
346     zkController.close();
347   }
348 
349 }