001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023
024import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.hadoop.hbase.errorhandling.ForeignException;
027import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.zookeeper.ZKUtil;
030import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
031import org.apache.zookeeper.KeeperException;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * ZooKeeper based controller for a procedure member.
037 * <p>
038 * There can only be one {@link ZKProcedureMemberRpcs} per procedure type per member,
039 * since each procedure type is bound to a single set of znodes. You can have multiple
040 * {@link ZKProcedureMemberRpcs} on the same server, each serving a different member
041 * name, but each individual rpcs is still bound to a single member name (and since they are
042 * used to determine global progress, its important to not get this wrong).
043 * <p>
044 * To make this slightly more confusing, you can run multiple, concurrent procedures at the same
045 * time (as long as they have different types), from the same controller, but the same node name
046 * must be used for each procedure (though there is no conflict between the two procedure as long
047 * as they have distinct names).
048 * <p>
049 * There is no real error recovery with this mechanism currently -- if any the coordinator fails,
050 * its re-initialization will delete the znodes and require all in progress subprocedures to start
051 * anew.
052 */
053@InterfaceAudience.Private
054public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
055  private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureMemberRpcs.class);
056
057  private final ZKProcedureUtil zkController;
058
059  protected ProcedureMember member;
060  private String memberName;
061
062  /**
063   * Must call {@link #start(String, ProcedureMember)} before this can be used.
064   * @param watcher {@link ZKWatcher} to be owned by <tt>this</tt>. Closed via
065   *          {@link #close()}.
066   * @param procType name of the znode describing the procedure type
067   * @throws KeeperException if we can't reach zookeeper
068   */
069  public ZKProcedureMemberRpcs(final ZKWatcher watcher, final String procType)
070      throws KeeperException {
071    this.zkController = new ZKProcedureUtil(watcher, procType) {
072      @Override
073      public void nodeCreated(String path) {
074        if (!isInProcedurePath(path)) {
075          return;
076        }
077
078        LOG.info("Received created event:" + path);
079        // if it is a simple start/end/abort then we just rewatch the node
080        if (isAcquiredNode(path)) {
081          waitForNewProcedures();
082          return;
083        } else if (isAbortNode(path)) {
084          watchForAbortedProcedures();
085          return;
086        }
087        String parent = ZKUtil.getParent(path);
088        // if its the end barrier, the procedure can be completed
089        if (isReachedNode(parent)) {
090          receivedReachedGlobalBarrier(path);
091          return;
092        } else if (isAbortNode(parent)) {
093          abort(path);
094          return;
095        } else if (isAcquiredNode(parent)) {
096          startNewSubprocedure(path);
097        } else {
098          LOG.debug("Ignoring created notification for node:" + path);
099        }
100      }
101
102      @Override
103      public void nodeChildrenChanged(String path) {
104        if (path.equals(this.acquiredZnode)) {
105          LOG.info("Received procedure start children changed event: " + path);
106          waitForNewProcedures();
107        } else if (path.equals(this.abortZnode)) {
108          LOG.info("Received procedure abort children changed event: " + path);
109          watchForAbortedProcedures();
110        }
111      }
112    };
113  }
114
115  public ZKProcedureUtil getZkController() {
116    return zkController;
117  }
118
119  @Override
120  public String getMemberName() {
121    return memberName;
122  }
123
124  /**
125   * Pass along the procedure global barrier notification to any listeners
126   * @param path full znode path that cause the notification
127   */
128  private void receivedReachedGlobalBarrier(String path) {
129    LOG.debug("Received reached global barrier:" + path);
130    String procName = ZKUtil.getNodeName(path);
131    this.member.receivedReachedGlobalBarrier(procName);
132  }
133
134  private void watchForAbortedProcedures() {
135    LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
136    try {
137      // this is the list of the currently aborted procedues
138      List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
139                   zkController.getAbortZnode());
140      if (children == null || children.isEmpty()) {
141        return;
142      }
143      for (String node : children) {
144        String abortNode = ZNodePaths.joinZNode(zkController.getAbortZnode(), node);
145        abort(abortNode);
146      }
147    } catch (KeeperException e) {
148      member.controllerConnectionFailure("Failed to list children for abort node:"
149          + zkController.getAbortZnode(), e, null);
150    }
151  }
152
153  private void waitForNewProcedures() {
154    // watch for new procedues that we need to start subprocedures for
155    LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'");
156    List<String> runningProcedures = null;
157    try {
158      runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
159        zkController.getAcquiredBarrier());
160      if (runningProcedures == null) {
161        LOG.debug("No running procedures.");
162        return;
163      }
164    } catch (KeeperException e) {
165      member.controllerConnectionFailure("General failure when watching for new procedures",
166        e, null);
167    }
168    if (runningProcedures == null) {
169      LOG.debug("No running procedures.");
170      return;
171    }
172    for (String procName : runningProcedures) {
173      // then read in the procedure information
174      String path = ZNodePaths.joinZNode(zkController.getAcquiredBarrier(), procName);
175      startNewSubprocedure(path);
176    }
177  }
178
179  /**
180   * Kick off a new sub-procedure on the listener with the data stored in the passed znode.
181   * <p>
182   * Will attempt to create the same procedure multiple times if an procedure znode with the same
183   * name is created. It is left up the coordinator to ensure this doesn't occur.
184   * @param path full path to the znode for the procedure to start
185   */
186  private synchronized void startNewSubprocedure(String path) {
187    LOG.debug("Found procedure znode: " + path);
188    String opName = ZKUtil.getNodeName(path);
189    // start watching for an abort notification for the procedure
190    String abortZNode = zkController.getAbortZNode(opName);
191    try {
192      if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) {
193        LOG.debug("Not starting:" + opName + " because we already have an abort notification.");
194        return;
195      }
196    } catch (KeeperException e) {
197      member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
198          + ") for procedure :" + opName, e, opName);
199      return;
200    }
201
202    // get the data for the procedure
203    Subprocedure subproc = null;
204    try {
205      byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
206      if (!ProtobufUtil.isPBMagicPrefix(data)) {
207        String msg = "Data in for starting procedure " + opName +
208          " is illegally formatted (no pb magic). " +
209          "Killing the procedure: " + Bytes.toString(data);
210        LOG.error(msg);
211        throw new IllegalArgumentException(msg);
212      }
213      LOG.debug("start proc data length is " + data.length);
214      data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
215      LOG.debug("Found data for znode:" + path);
216      subproc = member.createSubprocedure(opName, data);
217      member.submitSubprocedure(subproc);
218    } catch (IllegalArgumentException iae ) {
219      LOG.error("Illegal argument exception", iae);
220      sendMemberAborted(subproc, new ForeignException(getMemberName(), iae));
221    } catch (IllegalStateException ise) {
222      LOG.error("Illegal state exception ", ise);
223      sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
224    } catch (KeeperException e) {
225      member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
226        e, opName);
227    } catch (InterruptedException e) {
228      member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
229        e, opName);
230      Thread.currentThread().interrupt();
231    }
232  }
233
234  /**
235   * This attempts to create an acquired state znode for the procedure (snapshot name).
236   *
237   * It then looks for the reached znode to trigger in-barrier execution.  If not present we
238   * have a watcher, if present then trigger the in-barrier action.
239   */
240  @Override
241  public void sendMemberAcquired(Subprocedure sub) throws IOException {
242    String procName = sub.getName();
243    try {
244      LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
245          + ") in zk");
246      String acquiredZNode = ZNodePaths.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(
247        zkController, procName), memberName);
248      ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
249
250      // watch for the complete node for this snapshot
251      String reachedBarrier = zkController.getReachedBarrierNode(procName);
252      LOG.debug("Watch for global barrier reached:" + reachedBarrier);
253      if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
254        receivedReachedGlobalBarrier(reachedBarrier);
255      }
256    } catch (KeeperException e) {
257      member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
258          + procName + " and member: " + memberName, e, procName);
259    }
260  }
261
262  /**
263   * This acts as the ack for a completed procedure
264   */
265  @Override
266  public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException {
267    String procName = sub.getName();
268    LOG.debug("Marking procedure  '" + procName + "' completed for member '" + memberName
269        + "' in zk");
270    String joinPath =
271      ZNodePaths.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
272    // ProtobufUtil.prependPBMagic does not take care of null
273    if (data == null) {
274      data = new byte[0];
275    }
276    try {
277      ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath,
278        ProtobufUtil.prependPBMagic(data));
279    } catch (KeeperException e) {
280      member.controllerConnectionFailure("Failed to post zk node:" + joinPath
281          + " to join procedure barrier.", e, procName);
282    }
283  }
284
285  /**
286   * This should be called by the member and should write a serialized root cause exception as
287   * to the abort znode.
288   */
289  @Override
290  public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
291    if (sub == null) {
292      LOG.error("Failed due to null subprocedure", ee);
293      return;
294    }
295    String procName = sub.getName();
296    LOG.debug("Aborting procedure (" + procName + ") in zk");
297    String procAbortZNode = zkController.getAbortZNode(procName);
298    try {
299      String source = (ee.getSource() == null) ? memberName: ee.getSource();
300      byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
301      ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
302      LOG.debug("Finished creating abort znode:" + procAbortZNode);
303    } catch (KeeperException e) {
304      // possible that we get this error for the procedure if we already reset the zk state, but in
305      // that case we should still get an error for that procedure anyways
306      zkController.logZKTree(zkController.getBaseZnode());
307      member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
308          + " to abort procedure", e, procName);
309    }
310  }
311
312  /**
313   * Pass along the found abort notification to the listener
314   * @param abortZNode full znode path to the failed procedure information
315   */
316  protected void abort(String abortZNode) {
317    LOG.debug("Aborting procedure member for znode " + abortZNode);
318    String opName = ZKUtil.getNodeName(abortZNode);
319    try {
320      byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode);
321
322      // figure out the data we need to pass
323      ForeignException ee;
324      try {
325        if (data == null || data.length == 0) {
326          // ignore
327          return;
328        } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
329          String msg = "Illegally formatted data in abort node for proc " + opName
330              + ".  Killing the procedure.";
331          LOG.error(msg);
332          // we got a remote exception, but we can't describe it so just return exn from here
333          ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
334        } else {
335          data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
336          ee = ForeignException.deserialize(data);
337        }
338      } catch (IOException e) {
339        LOG.warn("Got an error notification for op:" + opName
340            + " but we can't read the information. Killing the procedure.");
341        // we got a remote exception, but we can't describe it so just return exn from here
342        ee = new ForeignException(getMemberName(), e);
343      }
344
345      this.member.receiveAbortProcedure(opName, ee);
346    } catch (KeeperException e) {
347      member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
348          + zkController.getAbortZnode(), e, opName);
349    } catch (InterruptedException e) {
350      LOG.warn("abort already in progress", e);
351      Thread.currentThread().interrupt();
352    }
353  }
354
355  @Override
356  public void start(final String memberName, final ProcedureMember listener) {
357    LOG.debug("Starting procedure member '" + memberName + "'");
358    this.member = listener;
359    this.memberName = memberName;
360    watchForAbortedProcedures();
361    waitForNewProcedures();
362  }
363
364  @Override
365  public void close() throws IOException {
366    zkController.close();
367  }
368
369}