View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.Arrays;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.errorhandling.ForeignException;
29  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
30  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
31  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32  import org.apache.zookeeper.KeeperException;
33  
34  import com.google.protobuf.InvalidProtocolBufferException;
35  
36  /**
37   * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator}
38   */
39  @InterfaceAudience.Private
40  public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
41    private static final Log LOG = LogFactory.getLog(ZKProcedureCoordinatorRpcs.class);
42    private ZKProcedureUtil zkProc = null;
43    protected ProcedureCoordinator coordinator = null;  // if started this should be non-null
44  
45    ZooKeeperWatcher watcher;
46    String procedureType;
47    String coordName;
48  
49    /**
50     * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()}
51     * @param procedureClass procedure type name is a category for when there are multiple kinds of
52     *    procedures.-- this becomes a znode so be aware of the naming restrictions
53     * @param coordName name of the node running the coordinator
54     * @throws KeeperException if an unexpected zk error occurs
55     */
56    public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
57        String procedureClass, String coordName) throws KeeperException {
58      this.watcher = watcher;
59      this.procedureType = procedureClass;
60      this.coordName = coordName;
61    }
62  
63    /**
64     * The "acquire" phase.  The coordinator creates a new procType/acquired/ znode dir. If znodes
65     * appear, first acquire to relevant listener or sets watch waiting for notification of
66     * the acquire node
67     *
68     * @param proc the Procedure
69     * @param info data to be stored in the acquire node
70     * @param nodeNames children of the acquire phase
71     * @throws IOException if any failure occurs.
72     */
73    @Override
74    final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
75        throws IOException, IllegalArgumentException {
76      String procName = proc.getName();
77      // start watching for the abort node
78      String abortNode = zkProc.getAbortZNode(procName);
79      try {
80        // check to see if the abort node already exists
81        if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
82          abort(abortNode);
83        }
84        // If we get an abort node watch triggered here, we'll go complete creating the acquired
85        // znode but then handle the acquire znode and bail out
86      } catch (KeeperException e) {
87        String msg = "Failed while watching abort node:" + abortNode;
88        LOG.error(msg, e);
89        throw new IOException(msg, e);
90      }
91  
92      // create the acquire barrier
93      String acquire = zkProc.getAcquiredBarrierNode(procName);
94      LOG.debug("Creating acquire znode:" + acquire);
95      try {
96        // notify all the procedure listeners to look for the acquire node
97        byte[] data = ProtobufUtil.prependPBMagic(info);
98        ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
99        // loop through all the children of the acquire phase and watch for them
100       for (String node : nodeNames) {
101         String znode = ZKUtil.joinZNode(acquire, node);
102         LOG.debug("Watching for acquire node:" + znode);
103         if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
104           coordinator.memberAcquiredBarrier(procName, node);
105         }
106       }
107     } catch (KeeperException e) {
108       String msg = "Failed while creating acquire node:" + acquire;
109       LOG.error(msg, e);
110       throw new IOException(msg, e);
111     }
112   }
113 
114   @Override
115   public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
116     String procName = proc.getName();
117     String reachedNode = zkProc.getReachedBarrierNode(procName);
118     LOG.debug("Creating reached barrier zk node:" + reachedNode);
119     try {
120       // create the reached znode and watch for the reached znodes
121       ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
122       // loop through all the children of the acquire phase and watch for them
123       for (String node : nodeNames) {
124         String znode = ZKUtil.joinZNode(reachedNode, node);
125         if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
126           byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode);
127           // ProtobufUtil.isPBMagicPrefix will check null
128           if (dataFromMember != null && dataFromMember.length > 0) {
129             if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
130               String msg =
131                 "Failed to get data from finished node or data is illegally formatted: " + znode;
132               LOG.error(msg);
133               throw new IOException(msg);
134             } else {
135               dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
136                 dataFromMember.length);
137               coordinator.memberFinishedBarrier(procName, node, dataFromMember);
138             }
139           } else {
140             coordinator.memberFinishedBarrier(procName, node, dataFromMember);
141           }
142         }
143       }
144     } catch (KeeperException e) {
145       String msg = "Failed while creating reached node:" + reachedNode;
146       LOG.error(msg, e);
147       throw new IOException(msg, e);
148     } catch (InterruptedException e) {
149       String msg = "Interrupted while creating reached node:" + reachedNode;
150       LOG.error(msg, e);
151       throw new InterruptedIOException(msg);
152     }
153   }
154 
155 
156   /**
157    * Delete znodes that are no longer in use.
158    */
159   @Override
160   final public void resetMembers(Procedure proc) throws IOException {
161     String procName = proc.getName();
162     boolean stillGettingNotifications = false;
163     do {
164       try {
165         LOG.debug("Attempting to clean out zk node for op:" + procName);
166         zkProc.clearZNodes(procName);
167         stillGettingNotifications = false;
168       } catch (KeeperException.NotEmptyException e) {
169         // recursive delete isn't transactional (yet) so we need to deal with cases where we get
170         // children trickling in
171         stillGettingNotifications = true;
172       } catch (KeeperException e) {
173         String msg = "Failed to complete reset procedure " + procName;
174         LOG.error(msg, e);
175         throw new IOException(msg, e);
176       }
177     } while (stillGettingNotifications);
178   }
179 
180   /**
181    * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
182    * @return true if succeed, false if encountered initialization errors.
183    */
184   final public boolean start(final ProcedureCoordinator coordinator) {
185     if (this.coordinator != null) {
186       throw new IllegalStateException(
187         "ZKProcedureCoordinator already started and already has listener installed");
188     }
189     this.coordinator = coordinator;
190 
191     try {
192       this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
193         @Override
194         public void nodeCreated(String path) {
195           if (!isInProcedurePath(path)) return;
196           LOG.debug("Node created: " + path);
197           logZKTree(this.baseZNode);
198           if (isAcquiredPathNode(path)) {
199             // node wasn't present when we created the watch so zk event triggers acquire
200             coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
201               ZKUtil.getNodeName(path));
202           } else if (isReachedPathNode(path)) {
203             // node was absent when we created the watch so zk event triggers the finished barrier.
204 
205             // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order.
206             String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
207             String member = ZKUtil.getNodeName(path);
208             // get the data from the procedure member
209             try {
210               byte[] dataFromMember = ZKUtil.getData(watcher, path);
211               // ProtobufUtil.isPBMagicPrefix will check null
212               if (dataFromMember != null && dataFromMember.length > 0) {
213                 if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
214                   ForeignException ee = new ForeignException(coordName,
215                     "Failed to get data from finished node or data is illegally formatted:"
216                         + path);
217                   coordinator.abortProcedure(procName, ee);
218                 } else {
219                   dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
220                     dataFromMember.length);
221                   LOG.debug("Finished data from procedure '" + procName
222                     + "' member '" + member + "': " + new String(dataFromMember));
223                   coordinator.memberFinishedBarrier(procName, member, dataFromMember);
224                 }
225               } else {
226                 coordinator.memberFinishedBarrier(procName, member, dataFromMember);
227               }
228             } catch (KeeperException e) {
229               ForeignException ee = new ForeignException(coordName, e);
230               coordinator.abortProcedure(procName, ee);
231             } catch (InterruptedException e) {
232               ForeignException ee = new ForeignException(coordName, e);
233               coordinator.abortProcedure(procName, ee);
234             }
235           } else if (isAbortPathNode(path)) {
236             abort(path);
237           } else {
238             LOG.debug("Ignoring created notification for node:" + path);
239           }
240         }
241       };
242       zkProc.clearChildZNodes();
243     } catch (KeeperException e) {
244       LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
245       return false;
246     }
247 
248     LOG.debug("Starting the controller for procedure member:" + coordName);
249     return true;
250   }
251 
252   /**
253    * This is the abort message being sent by the coordinator to member
254    *
255    * TODO this code isn't actually used but can be used to issue a cancellation from the
256    * coordinator.
257    */
258   @Override
259   final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
260     String procName = proc.getName();
261     LOG.debug("Aborting procedure '" + procName + "' in zk");
262     String procAbortNode = zkProc.getAbortZNode(procName);
263     try {
264       LOG.debug("Creating abort znode:" + procAbortNode);
265       String source = (ee.getSource() == null) ? coordName : ee.getSource();
266       byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
267       // first create the znode for the procedure
268       ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
269       LOG.debug("Finished creating abort node:" + procAbortNode);
270     } catch (KeeperException e) {
271       // possible that we get this error for the procedure if we already reset the zk state, but in
272       // that case we should still get an error for that procedure anyways
273       zkProc.logZKTree(zkProc.baseZNode);
274       coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
275           + " to abort procedure '" + procName + "'", new IOException(e));
276     }
277   }
278 
279   /**
280    * Receive a notification and propagate it to the local coordinator
281    * @param abortNode full znode path to the failed procedure information
282    */
283   protected void abort(String abortNode) {
284     String procName = ZKUtil.getNodeName(abortNode);
285     ForeignException ee = null;
286     try {
287       byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
288       if (data == null || data.length == 0) {
289         // ignore
290         return;
291       } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
292         LOG.warn("Got an error notification for op:" + abortNode
293             + " but we can't read the information. Killing the procedure.");
294         // we got a remote exception, but we can't describe it
295         ee = new ForeignException(coordName,
296           "Data in abort node is illegally formatted.  ignoring content.");
297       } else {
298 
299         data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
300         ee = ForeignException.deserialize(data);
301       }
302     } catch (InvalidProtocolBufferException e) {
303       LOG.warn("Got an error notification for op:" + abortNode
304           + " but we can't read the information. Killing the procedure.");
305       // we got a remote exception, but we can't describe it
306       ee = new ForeignException(coordName, e);
307     } catch (KeeperException e) {
308       coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
309           + zkProc.getAbortZnode(), new IOException(e));
310     } catch (InterruptedException e) {
311       coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
312           + zkProc.getAbortZnode(), new IOException(e));
313       Thread.currentThread().interrupt();
314     }
315     coordinator.abortProcedure(procName, ee);
316   }
317 
318   @Override
319   final public void close() throws IOException {
320     zkProc.close();
321   }
322 
323   /**
324    * Used in testing
325    */
326   final ZKProcedureUtil getZkProcedureUtil() {
327     return zkProc;
328   }
329 }