001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import org.apache.hadoop.hbase.errorhandling.ForeignException;
024import org.apache.hadoop.hbase.util.Bytes;
025import org.apache.hadoop.hbase.zookeeper.ZKUtil;
026import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
027import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.zookeeper.KeeperException;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
034
035/**
036 * ZooKeeper based controller for a procedure member.
037 * <p>
038 * There can only be one {@link ZKProcedureMemberRpcs} per procedure type per member, since each
039 * procedure type is bound to a single set of znodes. You can have multiple
040 * {@link ZKProcedureMemberRpcs} on the same server, each serving a different member name, but each
041 * individual rpcs is still bound to a single member name (and since they are used to determine
042 * global progress, its important to not get this wrong).
043 * <p>
044 * To make this slightly more confusing, you can run multiple, concurrent procedures at the same
045 * time (as long as they have different types), from the same controller, but the same node name
046 * must be used for each procedure (though there is no conflict between the two procedure as long as
047 * they have distinct names).
048 * <p>
049 * There is no real error recovery with this mechanism currently -- if any the coordinator fails,
050 * its re-initialization will delete the znodes and require all in progress subprocedures to start
051 * anew.
052 */
053@InterfaceAudience.Private
054public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
055  private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureMemberRpcs.class);
056
057  private final ZKProcedureUtil zkController;
058
059  protected ProcedureMember member;
060  private String memberName;
061
062  /**
063   * Must call {@link #start(String, ProcedureMember)} before this can be used.
064   * @param watcher  {@link ZKWatcher} to be owned by <tt>this</tt>. Closed via {@link #close()}.
065   * @param procType name of the znode describing the procedure type
066   * @throws KeeperException if we can't reach zookeeper
067   */
068  public ZKProcedureMemberRpcs(final ZKWatcher watcher, final String procType)
069    throws KeeperException {
070    this.zkController = new ZKProcedureUtil(watcher, procType) {
071      @Override
072      public void nodeCreated(String path) {
073        if (!isInProcedurePath(path)) {
074          return;
075        }
076
077        LOG.info("Received created event:" + path);
078        // if it is a simple start/end/abort then we just rewatch the node
079        if (isAcquiredNode(path)) {
080          waitForNewProcedures();
081          return;
082        } else if (isAbortNode(path)) {
083          watchForAbortedProcedures();
084          return;
085        }
086        String parent = ZKUtil.getParent(path);
087        // if its the end barrier, the procedure can be completed
088        if (isReachedNode(parent)) {
089          receivedReachedGlobalBarrier(path);
090          return;
091        } else if (isAbortNode(parent)) {
092          abort(path);
093          return;
094        } else if (isAcquiredNode(parent)) {
095          startNewSubprocedure(path);
096        } else {
097          LOG.debug("Ignoring created notification for node:" + path);
098        }
099      }
100
101      @Override
102      public void nodeChildrenChanged(String path) {
103        if (path.equals(this.acquiredZnode)) {
104          LOG.info("Received procedure start children changed event: " + path);
105          waitForNewProcedures();
106        } else if (path.equals(this.abortZnode)) {
107          LOG.info("Received procedure abort children changed event: " + path);
108          watchForAbortedProcedures();
109        }
110      }
111    };
112  }
113
114  public ZKProcedureUtil getZkController() {
115    return zkController;
116  }
117
118  @Override
119  public String getMemberName() {
120    return memberName;
121  }
122
123  /**
124   * Pass along the procedure global barrier notification to any listeners
125   * @param path full znode path that cause the notification
126   */
127  private void receivedReachedGlobalBarrier(String path) {
128    LOG.debug("Received reached global barrier:" + path);
129    String procName = ZKUtil.getNodeName(path);
130    this.member.receivedReachedGlobalBarrier(procName);
131  }
132
133  private void watchForAbortedProcedures() {
134    LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
135    try {
136      // this is the list of the currently aborted procedues
137      List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
138        zkController.getAbortZnode());
139      if (children == null || children.isEmpty()) {
140        return;
141      }
142      for (String node : children) {
143        String abortNode = ZNodePaths.joinZNode(zkController.getAbortZnode(), node);
144        abort(abortNode);
145      }
146    } catch (KeeperException e) {
147      member.controllerConnectionFailure(
148        "Failed to list children for abort node:" + zkController.getAbortZnode(), e, null);
149    }
150  }
151
152  private void waitForNewProcedures() {
153    // watch for new procedues that we need to start subprocedures for
154    LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'");
155    List<String> runningProcedures = null;
156    try {
157      runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
158        zkController.getAcquiredBarrier());
159      if (runningProcedures == null) {
160        LOG.debug("No running procedures.");
161        return;
162      }
163    } catch (KeeperException e) {
164      member.controllerConnectionFailure("General failure when watching for new procedures", e,
165        null);
166    }
167    if (runningProcedures == null) {
168      LOG.debug("No running procedures.");
169      return;
170    }
171    for (String procName : runningProcedures) {
172      // then read in the procedure information
173      String path = ZNodePaths.joinZNode(zkController.getAcquiredBarrier(), procName);
174      startNewSubprocedure(path);
175    }
176  }
177
178  /**
179   * Kick off a new sub-procedure on the listener with the data stored in the passed znode.
180   * <p>
181   * Will attempt to create the same procedure multiple times if an procedure znode with the same
182   * name is created. It is left up the coordinator to ensure this doesn't occur.
183   * @param path full path to the znode for the procedure to start
184   */
185  private synchronized void startNewSubprocedure(String path) {
186    LOG.debug("Found procedure znode: " + path);
187    String opName = ZKUtil.getNodeName(path);
188    // start watching for an abort notification for the procedure
189    String abortZNode = zkController.getAbortZNode(opName);
190    try {
191      if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) {
192        LOG.debug("Not starting:" + opName + " because we already have an abort notification.");
193        return;
194      }
195    } catch (KeeperException e) {
196      member.controllerConnectionFailure(
197        "Failed to get the abort znode (" + abortZNode + ") for procedure :" + opName, e, opName);
198      return;
199    }
200
201    // get the data for the procedure
202    Subprocedure subproc = null;
203    try {
204      byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
205      if (!ProtobufUtil.isPBMagicPrefix(data)) {
206        String msg =
207          "Data in for starting procedure " + opName + " is illegally formatted (no pb magic). "
208            + "Killing the procedure: " + Bytes.toString(data);
209        LOG.error(msg);
210        throw new IllegalArgumentException(msg);
211      }
212      LOG.debug("start proc data length is " + data.length);
213      data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
214      LOG.debug("Found data for znode:" + path);
215      subproc = member.createSubprocedure(opName, data);
216      member.submitSubprocedure(subproc);
217    } catch (IllegalArgumentException iae) {
218      LOG.error("Illegal argument exception", iae);
219      sendMemberAborted(subproc, new ForeignException(getMemberName(), iae));
220    } catch (IllegalStateException ise) {
221      LOG.error("Illegal state exception ", ise);
222      sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
223    } catch (KeeperException e) {
224      member.controllerConnectionFailure("Failed to get data for new procedure:" + opName, e,
225        opName);
226    } catch (InterruptedException e) {
227      member.controllerConnectionFailure("Failed to get data for new procedure:" + opName, e,
228        opName);
229      Thread.currentThread().interrupt();
230    }
231  }
232
233  /**
234   * This attempts to create an acquired state znode for the procedure (snapshot name). It then
235   * looks for the reached znode to trigger in-barrier execution. If not present we have a watcher,
236   * if present then trigger the in-barrier action.
237   */
238  @Override
239  public void sendMemberAcquired(Subprocedure sub) throws IOException {
240    String procName = sub.getName();
241    try {
242      LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
243        + ") in zk");
244      String acquiredZNode = ZNodePaths
245        .joinZNode(ZKProcedureUtil.getAcquireBarrierNode(zkController, procName), memberName);
246      ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
247
248      // watch for the complete node for this snapshot
249      String reachedBarrier = zkController.getReachedBarrierNode(procName);
250      LOG.debug("Watch for global barrier reached:" + reachedBarrier);
251      if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
252        receivedReachedGlobalBarrier(reachedBarrier);
253      }
254    } catch (KeeperException e) {
255      member.controllerConnectionFailure(
256        "Failed to acquire barrier for procedure: " + procName + " and member: " + memberName, e,
257        procName);
258    }
259  }
260
261  /**
262   * This acts as the ack for a completed procedure
263   */
264  @Override
265  public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException {
266    String procName = sub.getName();
267    LOG.debug(
268      "Marking procedure  '" + procName + "' completed for member '" + memberName + "' in zk");
269    String joinPath =
270      ZNodePaths.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
271    // ProtobufUtil.prependPBMagic does not take care of null
272    if (data == null) {
273      data = new byte[0];
274    }
275    try {
276      ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath,
277        ProtobufUtil.prependPBMagic(data));
278    } catch (KeeperException e) {
279      member.controllerConnectionFailure(
280        "Failed to post zk node:" + joinPath + " to join procedure barrier.", e, procName);
281    }
282  }
283
284  /**
285   * This should be called by the member and should write a serialized root cause exception as to
286   * the abort znode.
287   */
288  @Override
289  public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
290    if (sub == null) {
291      LOG.error("Failed due to null subprocedure", ee);
292      return;
293    }
294    String procName = sub.getName();
295    LOG.debug("Aborting procedure (" + procName + ") in zk");
296    String procAbortZNode = zkController.getAbortZNode(procName);
297    try {
298      String source = (ee.getSource() == null) ? memberName : ee.getSource();
299      byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
300      ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
301      LOG.debug("Finished creating abort znode:" + procAbortZNode);
302    } catch (KeeperException e) {
303      // possible that we get this error for the procedure if we already reset the zk state, but in
304      // that case we should still get an error for that procedure anyways
305      zkController.logZKTree(zkController.getBaseZnode());
306      member.controllerConnectionFailure(
307        "Failed to post zk node:" + procAbortZNode + " to abort procedure", e, procName);
308    }
309  }
310
311  /**
312   * Pass along the found abort notification to the listener
313   * @param abortZNode full znode path to the failed procedure information
314   */
315  protected void abort(String abortZNode) {
316    LOG.debug("Aborting procedure member for znode " + abortZNode);
317    String opName = ZKUtil.getNodeName(abortZNode);
318    try {
319      byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode);
320
321      // figure out the data we need to pass
322      ForeignException ee;
323      try {
324        if (data == null || data.length == 0) {
325          // ignore
326          return;
327        } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
328          String msg = "Illegally formatted data in abort node for proc " + opName
329            + ".  Killing the procedure.";
330          LOG.error(msg);
331          // we got a remote exception, but we can't describe it so just return exn from here
332          ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
333        } else {
334          data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
335          ee = ForeignException.deserialize(data);
336        }
337      } catch (IOException e) {
338        LOG.warn("Got an error notification for op:" + opName
339          + " but we can't read the information. Killing the procedure.");
340        // we got a remote exception, but we can't describe it so just return exn from here
341        ee = new ForeignException(getMemberName(), e);
342      }
343
344      this.member.receiveAbortProcedure(opName, ee);
345    } catch (KeeperException e) {
346      member.controllerConnectionFailure(
347        "Failed to get data for abort znode:" + abortZNode + zkController.getAbortZnode(), e,
348        opName);
349    } catch (InterruptedException e) {
350      LOG.warn("abort already in progress", e);
351      Thread.currentThread().interrupt();
352    }
353  }
354
355  @Override
356  public void start(final String memberName, final ProcedureMember listener) {
357    LOG.debug("Starting procedure member '" + memberName + "'");
358    this.member = listener;
359    this.memberName = memberName;
360    watchForAbortedProcedures();
361    waitForNewProcedures();
362  }
363
364  @Override
365  public void close() throws IOException {
366    zkController.close();
367  }
368
369}