001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023
024import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.hadoop.hbase.errorhandling.ForeignException;
027import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.zookeeper.ZKUtil;
030import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
031import org.apache.zookeeper.KeeperException;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * ZooKeeper based controller for a procedure member.
037 * <p>
038 * There can only be one {@link ZKProcedureMemberRpcs} per procedure type per member,
039 * since each procedure type is bound to a single set of znodes. You can have multiple
040 * {@link ZKProcedureMemberRpcs} on the same server, each serving a different member
041 * name, but each individual rpcs is still bound to a single member name (and since they are
042 * used to determine global progress, its important to not get this wrong).
043 * <p>
044 * To make this slightly more confusing, you can run multiple, concurrent procedures at the same
045 * time (as long as they have different types), from the same controller, but the same node name
046 * must be used for each procedure (though there is no conflict between the two procedure as long
047 * as they have distinct names).
048 * <p>
049 * There is no real error recovery with this mechanism currently -- if any the coordinator fails,
050 * its re-initialization will delete the znodes and require all in progress subprocedures to start
051 * anew.
052 */
053@InterfaceAudience.Private
054public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
055  private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureMemberRpcs.class);
056
057  private final ZKProcedureUtil zkController;
058
059  protected ProcedureMember member;
060  private String memberName;
061
062  /**
063   * Must call {@link #start(String, ProcedureMember)} before this can be used.
064   * @param watcher {@link ZKWatcher} to be owned by <tt>this</tt>. Closed via
065   *          {@link #close()}.
066   * @param procType name of the znode describing the procedure type
067   * @throws KeeperException if we can't reach zookeeper
068   */
069  public ZKProcedureMemberRpcs(final ZKWatcher watcher, final String procType)
070      throws KeeperException {
071    this.zkController = new ZKProcedureUtil(watcher, procType) {
072      @Override
073      public void nodeCreated(String path) {
074        if (!isInProcedurePath(path)) {
075          return;
076        }
077
078        LOG.info("Received created event:" + path);
079        // if it is a simple start/end/abort then we just rewatch the node
080        if (isAcquiredNode(path)) {
081          waitForNewProcedures();
082          return;
083        } else if (isAbortNode(path)) {
084          watchForAbortedProcedures();
085          return;
086        }
087        String parent = ZKUtil.getParent(path);
088        // if its the end barrier, the procedure can be completed
089        if (isReachedNode(parent)) {
090          receivedReachedGlobalBarrier(path);
091          return;
092        } else if (isAbortNode(parent)) {
093          abort(path);
094          return;
095        } else if (isAcquiredNode(parent)) {
096          startNewSubprocedure(path);
097        } else {
098          LOG.debug("Ignoring created notification for node:" + path);
099        }
100      }
101
102      @Override
103      public void nodeChildrenChanged(String path) {
104        if (path.equals(this.acquiredZnode)) {
105          LOG.info("Received procedure start children changed event: " + path);
106          waitForNewProcedures();
107        } else if (path.equals(this.abortZnode)) {
108          LOG.info("Received procedure abort children changed event: " + path);
109          watchForAbortedProcedures();
110        }
111      }
112    };
113  }
114
115  public ZKProcedureUtil getZkController() {
116    return zkController;
117  }
118
119  @Override
120  public String getMemberName() {
121    return memberName;
122  }
123
124  /**
125   * Pass along the procedure global barrier notification to any listeners
126   * @param path full znode path that cause the notification
127   */
128  private void receivedReachedGlobalBarrier(String path) {
129    LOG.debug("Received reached global barrier:" + path);
130    String procName = ZKUtil.getNodeName(path);
131    this.member.receivedReachedGlobalBarrier(procName);
132  }
133
134  private void watchForAbortedProcedures() {
135    LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
136    try {
137      // this is the list of the currently aborted procedues
138      for (String node : ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
139        zkController.getAbortZnode())) {
140        String abortNode = ZNodePaths.joinZNode(zkController.getAbortZnode(), node);
141        abort(abortNode);
142      }
143    } catch (KeeperException e) {
144      member.controllerConnectionFailure("Failed to list children for abort node:"
145          + zkController.getAbortZnode(), e, null);
146    }
147  }
148
149  private void waitForNewProcedures() {
150    // watch for new procedues that we need to start subprocedures for
151    LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'");
152    List<String> runningProcedures = null;
153    try {
154      runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
155        zkController.getAcquiredBarrier());
156      if (runningProcedures == null) {
157        LOG.debug("No running procedures.");
158        return;
159      }
160    } catch (KeeperException e) {
161      member.controllerConnectionFailure("General failure when watching for new procedures",
162        e, null);
163    }
164    if (runningProcedures == null) {
165      LOG.debug("No running procedures.");
166      return;
167    }
168    for (String procName : runningProcedures) {
169      // then read in the procedure information
170      String path = ZNodePaths.joinZNode(zkController.getAcquiredBarrier(), procName);
171      startNewSubprocedure(path);
172    }
173  }
174
175  /**
176   * Kick off a new sub-procedure on the listener with the data stored in the passed znode.
177   * <p>
178   * Will attempt to create the same procedure multiple times if an procedure znode with the same
179   * name is created. It is left up the coordinator to ensure this doesn't occur.
180   * @param path full path to the znode for the procedure to start
181   */
182  private synchronized void startNewSubprocedure(String path) {
183    LOG.debug("Found procedure znode: " + path);
184    String opName = ZKUtil.getNodeName(path);
185    // start watching for an abort notification for the procedure
186    String abortZNode = zkController.getAbortZNode(opName);
187    try {
188      if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) {
189        LOG.debug("Not starting:" + opName + " because we already have an abort notification.");
190        return;
191      }
192    } catch (KeeperException e) {
193      member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
194          + ") for procedure :" + opName, e, opName);
195      return;
196    }
197
198    // get the data for the procedure
199    Subprocedure subproc = null;
200    try {
201      byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
202      if (!ProtobufUtil.isPBMagicPrefix(data)) {
203        String msg = "Data in for starting procedure " + opName +
204          " is illegally formatted (no pb magic). " +
205          "Killing the procedure: " + Bytes.toString(data);
206        LOG.error(msg);
207        throw new IllegalArgumentException(msg);
208      }
209      LOG.debug("start proc data length is " + data.length);
210      data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
211      LOG.debug("Found data for znode:" + path);
212      subproc = member.createSubprocedure(opName, data);
213      member.submitSubprocedure(subproc);
214    } catch (IllegalArgumentException iae ) {
215      LOG.error("Illegal argument exception", iae);
216      sendMemberAborted(subproc, new ForeignException(getMemberName(), iae));
217    } catch (IllegalStateException ise) {
218      LOG.error("Illegal state exception ", ise);
219      sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
220    } catch (KeeperException e) {
221      member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
222        e, opName);
223    } catch (InterruptedException e) {
224      member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
225        e, opName);
226      Thread.currentThread().interrupt();
227    }
228  }
229
230  /**
231   * This attempts to create an acquired state znode for the procedure (snapshot name).
232   *
233   * It then looks for the reached znode to trigger in-barrier execution.  If not present we
234   * have a watcher, if present then trigger the in-barrier action.
235   */
236  @Override
237  public void sendMemberAcquired(Subprocedure sub) throws IOException {
238    String procName = sub.getName();
239    try {
240      LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
241          + ") in zk");
242      String acquiredZNode = ZNodePaths.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(
243        zkController, procName), memberName);
244      ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
245
246      // watch for the complete node for this snapshot
247      String reachedBarrier = zkController.getReachedBarrierNode(procName);
248      LOG.debug("Watch for global barrier reached:" + reachedBarrier);
249      if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
250        receivedReachedGlobalBarrier(reachedBarrier);
251      }
252    } catch (KeeperException e) {
253      member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
254          + procName + " and member: " + memberName, e, procName);
255    }
256  }
257
258  /**
259   * This acts as the ack for a completed procedure
260   */
261  @Override
262  public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException {
263    String procName = sub.getName();
264    LOG.debug("Marking procedure  '" + procName + "' completed for member '" + memberName
265        + "' in zk");
266    String joinPath =
267      ZNodePaths.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
268    // ProtobufUtil.prependPBMagic does not take care of null
269    if (data == null) {
270      data = new byte[0];
271    }
272    try {
273      ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath,
274        ProtobufUtil.prependPBMagic(data));
275    } catch (KeeperException e) {
276      member.controllerConnectionFailure("Failed to post zk node:" + joinPath
277          + " to join procedure barrier.", e, procName);
278    }
279  }
280
281  /**
282   * This should be called by the member and should write a serialized root cause exception as
283   * to the abort znode.
284   */
285  @Override
286  public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
287    if (sub == null) {
288      LOG.error("Failed due to null subprocedure", ee);
289      return;
290    }
291    String procName = sub.getName();
292    LOG.debug("Aborting procedure (" + procName + ") in zk");
293    String procAbortZNode = zkController.getAbortZNode(procName);
294    try {
295      String source = (ee.getSource() == null) ? memberName: ee.getSource();
296      byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
297      ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
298      LOG.debug("Finished creating abort znode:" + procAbortZNode);
299    } catch (KeeperException e) {
300      // possible that we get this error for the procedure if we already reset the zk state, but in
301      // that case we should still get an error for that procedure anyways
302      zkController.logZKTree(zkController.getBaseZnode());
303      member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
304          + " to abort procedure", e, procName);
305    }
306  }
307
308  /**
309   * Pass along the found abort notification to the listener
310   * @param abortZNode full znode path to the failed procedure information
311   */
312  protected void abort(String abortZNode) {
313    LOG.debug("Aborting procedure member for znode " + abortZNode);
314    String opName = ZKUtil.getNodeName(abortZNode);
315    try {
316      byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode);
317
318      // figure out the data we need to pass
319      ForeignException ee;
320      try {
321        if (data == null || data.length == 0) {
322          // ignore
323          return;
324        } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
325          String msg = "Illegally formatted data in abort node for proc " + opName
326              + ".  Killing the procedure.";
327          LOG.error(msg);
328          // we got a remote exception, but we can't describe it so just return exn from here
329          ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
330        } else {
331          data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
332          ee = ForeignException.deserialize(data);
333        }
334      } catch (IOException e) {
335        LOG.warn("Got an error notification for op:" + opName
336            + " but we can't read the information. Killing the procedure.");
337        // we got a remote exception, but we can't describe it so just return exn from here
338        ee = new ForeignException(getMemberName(), e);
339      }
340
341      this.member.receiveAbortProcedure(opName, ee);
342    } catch (KeeperException e) {
343      member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
344          + zkController.getAbortZnode(), e, opName);
345    } catch (InterruptedException e) {
346      LOG.warn("abort already in progress", e);
347      Thread.currentThread().interrupt();
348    }
349  }
350
351  @Override
352  public void start(final String memberName, final ProcedureMember listener) {
353    LOG.debug("Starting procedure member '" + memberName + "'");
354    this.member = listener;
355    this.memberName = memberName;
356    watchForAbortedProcedures();
357    waitForNewProcedures();
358  }
359
360  @Override
361  public void close() throws IOException {
362    zkController.close();
363  }
364
365}