View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.Arrays;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.classification.InterfaceStability;
29  import org.apache.hadoop.hbase.errorhandling.ForeignException;
30  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
31  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
32  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
33  import org.apache.zookeeper.KeeperException;
34  
35  import com.google.protobuf.InvalidProtocolBufferException;
36  
37  /**
38   * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator}
39   */
40  @InterfaceAudience.Private
41  public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
42    public static final Log LOG = LogFactory.getLog(ZKProcedureCoordinatorRpcs.class);
43    private ZKProcedureUtil zkProc = null;
44    protected ProcedureCoordinator coordinator = null;  // if started this should be non-null
45  
46    ZooKeeperWatcher watcher;
47    String procedureType;
48    String coordName;
49  
50    /**
51     * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()}
52     * @param procedureClass procedure type name is a category for when there are multiple kinds of
53     *    procedures.-- this becomes a znode so be aware of the naming restrictions
54     * @param coordName name of the node running the coordinator
55     * @throws KeeperException if an unexpected zk error occurs
56     */
57    public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
58        String procedureClass, String coordName) throws KeeperException {
59      this.watcher = watcher;
60      this.procedureType = procedureClass;
61      this.coordName = coordName;
62    }
63  
64    /**
65     * The "acquire" phase.  The coordinator creates a new procType/acquired/ znode dir. If znodes
66     * appear, first acquire to relevant listener or sets watch waiting for notification of
67     * the acquire node
68     *
69     * @param proc the Procedure
70     * @param info data to be stored in the acquire node
71     * @param nodeNames children of the acquire phase
72     * @throws IOException if any failure occurs.
73     */
74    @Override
75    final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
76        throws IOException, IllegalArgumentException {
77      String procName = proc.getName();
78      // start watching for the abort node
79      String abortNode = zkProc.getAbortZNode(procName);
80      try {
81        // check to see if the abort node already exists
82        if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
83          abort(abortNode);
84        }
85        // If we get an abort node watch triggered here, we'll go complete creating the acquired
86        // znode but then handle the acquire znode and bail out
87      } catch (KeeperException e) {
88        LOG.error("Failed to watch abort", e);
89        throw new IOException("Failed while watching abort node:" + abortNode, e);
90      }
91  
92      // create the acquire barrier
93      String acquire = zkProc.getAcquiredBarrierNode(procName);
94      LOG.debug("Creating acquire znode:" + acquire);
95      try {
96        // notify all the procedure listeners to look for the acquire node
97        byte[] data = ProtobufUtil.prependPBMagic(info);
98        ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
99        // loop through all the children of the acquire phase and watch for them
100       for (String node : nodeNames) {
101         String znode = ZKUtil.joinZNode(acquire, node);
102         LOG.debug("Watching for acquire node:" + znode);
103         if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
104           coordinator.memberAcquiredBarrier(procName, node);
105         }
106       }
107     } catch (KeeperException e) {
108       throw new IOException("Failed while creating acquire node:" + acquire, e);
109     }
110   }
111 
112   @Override
113   public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
114     String procName = proc.getName();
115     String reachedNode = zkProc.getReachedBarrierNode(procName);
116     LOG.debug("Creating reached barrier zk node:" + reachedNode);
117     try {
118       // create the reached znode and watch for the reached znodes
119       ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
120       // loop through all the children of the acquire phase and watch for them
121       for (String node : nodeNames) {
122         String znode = ZKUtil.joinZNode(reachedNode, node);
123         if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
124           byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode);
125           // ProtobufUtil.isPBMagicPrefix will check null
126           if (dataFromMember != null && dataFromMember.length > 0) {
127             if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
128               throw new IOException(
129                 "Failed to get data from finished node or data is illegally formatted: "
130                     + znode);
131             } else {
132               dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
133                 dataFromMember.length);
134               coordinator.memberFinishedBarrier(procName, node, dataFromMember);
135             }
136           } else {
137             coordinator.memberFinishedBarrier(procName, node, dataFromMember);
138           }
139         }
140       }
141     } catch (KeeperException e) {
142       throw new IOException("Failed while creating reached node:" + reachedNode, e);
143     } catch (InterruptedException e) {
144       throw new InterruptedIOException("Interrupted while creating reached node:" + reachedNode);
145     }
146   }
147 
148 
149   /**
150    * Delete znodes that are no longer in use.
151    */
152   @Override
153   final public void resetMembers(Procedure proc) throws IOException {
154     String procName = proc.getName();
155     boolean stillGettingNotifications = false;
156     do {
157       try {
158         LOG.debug("Attempting to clean out zk node for op:" + procName);
159         zkProc.clearZNodes(procName);
160         stillGettingNotifications = false;
161       } catch (KeeperException.NotEmptyException e) {
162         // recursive delete isn't transactional (yet) so we need to deal with cases where we get
163         // children trickling in
164         stillGettingNotifications = true;
165       } catch (KeeperException e) {
166         throw new IOException("Failed to complete reset procedure " + procName, e);
167       }
168     } while (stillGettingNotifications);
169   }
170 
171   /**
172    * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
173    * @return true if succeed, false if encountered initialization errors.
174    */
175   final public boolean start(final ProcedureCoordinator coordinator) {
176     if (this.coordinator != null) {
177       throw new IllegalStateException(
178         "ZKProcedureCoordinator already started and already has listener installed");
179     }
180     this.coordinator = coordinator;
181 
182     try {
183       this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
184         @Override
185         public void nodeCreated(String path) {
186           if (!isInProcedurePath(path)) return;
187           LOG.debug("Node created: " + path);
188           logZKTree(this.baseZNode);
189           if (isAcquiredPathNode(path)) {
190             // node wasn't present when we created the watch so zk event triggers acquire
191             coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
192               ZKUtil.getNodeName(path));
193           } else if (isReachedPathNode(path)) {
194             // node was absent when we created the watch so zk event triggers the finished barrier.
195 
196             // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order.
197             String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
198             String member = ZKUtil.getNodeName(path);
199             // get the data from the procedure member
200             try {
201               byte[] dataFromMember = ZKUtil.getData(watcher, path);
202               // ProtobufUtil.isPBMagicPrefix will check null
203               if (dataFromMember != null && dataFromMember.length > 0) {
204                 if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
205                   ForeignException ee = new ForeignException(coordName,
206                     "Failed to get data from finished node or data is illegally formatted:"
207                         + path);
208                   coordinator.abortProcedure(procName, ee);
209                 } else {
210                   dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
211                     dataFromMember.length);
212                   LOG.debug("Finished data from procedure '" + procName
213                     + "' member '" + member + "': " + new String(dataFromMember));
214                   coordinator.memberFinishedBarrier(procName, member, dataFromMember);
215                 }
216               } else {
217                 coordinator.memberFinishedBarrier(procName, member, dataFromMember);
218               }
219             } catch (KeeperException e) {
220               ForeignException ee = new ForeignException(coordName, e);
221               coordinator.abortProcedure(procName, ee);
222             } catch (InterruptedException e) {
223               ForeignException ee = new ForeignException(coordName, e);
224               coordinator.abortProcedure(procName, ee);
225             }
226           } else if (isAbortPathNode(path)) {
227             abort(path);
228           } else {
229             LOG.debug("Ignoring created notification for node:" + path);
230           }
231         }
232       };
233       zkProc.clearChildZNodes();
234     } catch (KeeperException e) {
235       LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
236       return false;
237     }
238 
239     LOG.debug("Starting the controller for procedure member:" + coordName);
240     return true;
241   }
242 
243   /**
244    * This is the abort message being sent by the coordinator to member
245    *
246    * TODO this code isn't actually used but can be used to issue a cancellation from the
247    * coordinator.
248    */
249   @Override
250   final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
251     String procName = proc.getName();
252     LOG.debug("Aborting procedure '" + procName + "' in zk");
253     String procAbortNode = zkProc.getAbortZNode(procName);
254     try {
255       LOG.debug("Creating abort znode:" + procAbortNode);
256       String source = (ee.getSource() == null) ? coordName : ee.getSource();
257       byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
258       // first create the znode for the procedure
259       ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
260       LOG.debug("Finished creating abort node:" + procAbortNode);
261     } catch (KeeperException e) {
262       // possible that we get this error for the procedure if we already reset the zk state, but in
263       // that case we should still get an error for that procedure anyways
264       zkProc.logZKTree(zkProc.baseZNode);
265       coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
266           + " to abort procedure '" + procName + "'", new IOException(e));
267     }
268   }
269 
270   /**
271    * Receive a notification and propagate it to the local coordinator
272    * @param abortNode full znode path to the failed procedure information
273    */
274   protected void abort(String abortNode) {
275     String procName = ZKUtil.getNodeName(abortNode);
276     ForeignException ee = null;
277     try {
278       byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
279       if (!ProtobufUtil.isPBMagicPrefix(data)) {
280         LOG.warn("Got an error notification for op:" + abortNode
281             + " but we can't read the information. Killing the procedure.");
282         // we got a remote exception, but we can't describe it
283         ee = new ForeignException(coordName, "Data in abort node is illegally formatted.  ignoring content.");
284       } else {
285 
286         data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
287         ee = ForeignException.deserialize(data);
288       }
289     } catch (InvalidProtocolBufferException e) {
290       LOG.warn("Got an error notification for op:" + abortNode
291           + " but we can't read the information. Killing the procedure.");
292       // we got a remote exception, but we can't describe it
293       ee = new ForeignException(coordName, e);
294     } catch (KeeperException e) {
295       coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
296           + zkProc.getAbortZnode(), new IOException(e));
297     } catch (InterruptedException e) {
298       coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
299           + zkProc.getAbortZnode(), new IOException(e));
300       Thread.currentThread().interrupt();
301     }
302     coordinator.abortProcedure(procName, ee);
303   }
304 
305   @Override
306   final public void close() throws IOException {
307     zkProc.close();
308   }
309 
310   /**
311    * Used in testing
312    */
313   final ZKProcedureUtil getZkProcedureUtil() {
314     return zkProc;
315   }
316 }