001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.nio.charset.StandardCharsets;
023import java.util.Arrays;
024import java.util.List;
025
026import org.apache.hadoop.hbase.errorhandling.ForeignException;
027import org.apache.hadoop.hbase.zookeeper.ZKUtil;
028import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
029import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.apache.zookeeper.KeeperException;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
036
037/**
038 * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator}
039 */
040@InterfaceAudience.Private
041public class ZKProcedureCoordinator implements ProcedureCoordinatorRpcs {
042  private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureCoordinator.class);
043  private ZKProcedureUtil zkProc = null;
044  protected ProcedureCoordinator coordinator = null;  // if started this should be non-null
045
046  ZKWatcher watcher;
047  String procedureType;
048  String coordName;
049
050  /**
051   * @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()}
052   * @param procedureClass procedure type name is a category for when there are multiple kinds of
053   *    procedures.-- this becomes a znode so be aware of the naming restrictions
054   * @param coordName name of the node running the coordinator
055   * @throws KeeperException if an unexpected zk error occurs
056   */
057  public ZKProcedureCoordinator(ZKWatcher watcher,
058      String procedureClass, String coordName) {
059    this.watcher = watcher;
060    this.procedureType = procedureClass;
061    this.coordName = coordName;
062  }
063
064  /**
065   * The "acquire" phase.  The coordinator creates a new procType/acquired/ znode dir. If znodes
066   * appear, first acquire to relevant listener or sets watch waiting for notification of
067   * the acquire node
068   *
069   * @param proc the Procedure
070   * @param info data to be stored in the acquire node
071   * @param nodeNames children of the acquire phase
072   * @throws IOException if any failure occurs.
073   */
074  @Override
075  final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
076      throws IOException, IllegalArgumentException {
077    String procName = proc.getName();
078    // start watching for the abort node
079    String abortNode = zkProc.getAbortZNode(procName);
080    try {
081      // check to see if the abort node already exists
082      if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
083        abort(abortNode);
084      }
085      // If we get an abort node watch triggered here, we'll go complete creating the acquired
086      // znode but then handle the acquire znode and bail out
087    } catch (KeeperException e) {
088      String msg = "Failed while watching abort node:" + abortNode;
089      LOG.error(msg, e);
090      throw new IOException(msg, e);
091    }
092
093    // create the acquire barrier
094    String acquire = zkProc.getAcquiredBarrierNode(procName);
095    LOG.debug("Creating acquire znode:" + acquire);
096    try {
097      // notify all the procedure listeners to look for the acquire node
098      byte[] data = ProtobufUtil.prependPBMagic(info);
099      ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
100      // loop through all the children of the acquire phase and watch for them
101      for (String node : nodeNames) {
102        String znode = ZNodePaths.joinZNode(acquire, node);
103        LOG.debug("Watching for acquire node:" + znode);
104        if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
105          coordinator.memberAcquiredBarrier(procName, node);
106        }
107      }
108    } catch (KeeperException e) {
109      String msg = "Failed while creating acquire node:" + acquire;
110      LOG.error(msg, e);
111      throw new IOException(msg, e);
112    }
113  }
114
115  @Override
116  public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
117    String procName = proc.getName();
118    String reachedNode = zkProc.getReachedBarrierNode(procName);
119    LOG.debug("Creating reached barrier zk node:" + reachedNode);
120    try {
121      // create the reached znode and watch for the reached znodes
122      ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
123      // loop through all the children of the acquire phase and watch for them
124      for (String node : nodeNames) {
125        String znode = ZNodePaths.joinZNode(reachedNode, node);
126        if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
127          byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode);
128          // ProtobufUtil.isPBMagicPrefix will check null
129          if (dataFromMember != null && dataFromMember.length > 0) {
130            if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
131              String msg =
132                "Failed to get data from finished node or data is illegally formatted: " + znode;
133              LOG.error(msg);
134              throw new IOException(msg);
135            } else {
136              dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
137                dataFromMember.length);
138              coordinator.memberFinishedBarrier(procName, node, dataFromMember);
139            }
140          } else {
141            coordinator.memberFinishedBarrier(procName, node, dataFromMember);
142          }
143        }
144      }
145    } catch (KeeperException e) {
146      String msg = "Failed while creating reached node:" + reachedNode;
147      LOG.error(msg, e);
148      throw new IOException(msg, e);
149    } catch (InterruptedException e) {
150      String msg = "Interrupted while creating reached node:" + reachedNode;
151      LOG.error(msg, e);
152      throw new InterruptedIOException(msg);
153    }
154  }
155
156
157  /**
158   * Delete znodes that are no longer in use.
159   */
160  @Override
161  final public void resetMembers(Procedure proc) throws IOException {
162    String procName = proc.getName();
163    boolean stillGettingNotifications = false;
164    do {
165      try {
166        LOG.debug("Attempting to clean out zk node for op:" + procName);
167        zkProc.clearZNodes(procName);
168        stillGettingNotifications = false;
169      } catch (KeeperException.NotEmptyException e) {
170        // recursive delete isn't transactional (yet) so we need to deal with cases where we get
171        // children trickling in
172        stillGettingNotifications = true;
173      } catch (KeeperException e) {
174        String msg = "Failed to complete reset procedure " + procName;
175        LOG.error(msg, e);
176        throw new IOException(msg, e);
177      }
178    } while (stillGettingNotifications);
179  }
180
181  /**
182   * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
183   * @return true if succeed, false if encountered initialization errors.
184   */
185  @Override
186  final public boolean start(final ProcedureCoordinator coordinator) {
187    if (this.coordinator != null) {
188      throw new IllegalStateException(
189        "ZKProcedureCoordinator already started and already has listener installed");
190    }
191    this.coordinator = coordinator;
192
193    try {
194      this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
195        @Override
196        public void nodeCreated(String path) {
197          if (!isInProcedurePath(path)) return;
198          LOG.debug("Node created: " + path);
199          logZKTree(this.baseZNode);
200          if (isAcquiredPathNode(path)) {
201            // node wasn't present when we created the watch so zk event triggers acquire
202            coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
203              ZKUtil.getNodeName(path));
204          } else if (isReachedPathNode(path)) {
205            // node was absent when we created the watch so zk event triggers the finished barrier.
206
207            // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order.
208            String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
209            String member = ZKUtil.getNodeName(path);
210            // get the data from the procedure member
211            try {
212              byte[] dataFromMember = ZKUtil.getData(watcher, path);
213              // ProtobufUtil.isPBMagicPrefix will check null
214              if (dataFromMember != null && dataFromMember.length > 0) {
215                if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
216                  ForeignException ee = new ForeignException(coordName,
217                    "Failed to get data from finished node or data is illegally formatted:"
218                        + path);
219                  coordinator.abortProcedure(procName, ee);
220                } else {
221                  dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
222                    dataFromMember.length);
223                  LOG.debug("Finished data from procedure '{}' member '{}': {}", procName, member,
224                      new String(dataFromMember, StandardCharsets.UTF_8));
225                  coordinator.memberFinishedBarrier(procName, member, dataFromMember);
226                }
227              } else {
228                coordinator.memberFinishedBarrier(procName, member, dataFromMember);
229              }
230            } catch (KeeperException e) {
231              ForeignException ee = new ForeignException(coordName, e);
232              coordinator.abortProcedure(procName, ee);
233            } catch (InterruptedException e) {
234              ForeignException ee = new ForeignException(coordName, e);
235              coordinator.abortProcedure(procName, ee);
236            }
237          } else if (isAbortPathNode(path)) {
238            abort(path);
239          } else {
240            LOG.debug("Ignoring created notification for node:" + path);
241          }
242        }
243      };
244      zkProc.clearChildZNodes();
245    } catch (KeeperException e) {
246      LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
247      return false;
248    }
249
250    LOG.debug("Starting controller for procedure member=" + coordName);
251    return true;
252  }
253
254  /**
255   * This is the abort message being sent by the coordinator to member
256   *
257   * TODO this code isn't actually used but can be used to issue a cancellation from the
258   * coordinator.
259   */
260  @Override
261  final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
262    String procName = proc.getName();
263    LOG.debug("Aborting procedure '" + procName + "' in zk");
264    String procAbortNode = zkProc.getAbortZNode(procName);
265    try {
266      LOG.debug("Creating abort znode:" + procAbortNode);
267      String source = (ee.getSource() == null) ? coordName : ee.getSource();
268      byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
269      // first create the znode for the procedure
270      ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
271      LOG.debug("Finished creating abort node:" + procAbortNode);
272    } catch (KeeperException e) {
273      // possible that we get this error for the procedure if we already reset the zk state, but in
274      // that case we should still get an error for that procedure anyways
275      zkProc.logZKTree(zkProc.baseZNode);
276      coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
277          + " to abort procedure '" + procName + "'", new IOException(e));
278    }
279  }
280
281  /**
282   * Receive a notification and propagate it to the local coordinator
283   * @param abortNode full znode path to the failed procedure information
284   */
285  protected void abort(String abortNode) {
286    String procName = ZKUtil.getNodeName(abortNode);
287    ForeignException ee = null;
288    try {
289      byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
290      if (data == null || data.length == 0) {
291        // ignore
292        return;
293      } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
294        LOG.warn("Got an error notification for op:" + abortNode
295            + " but we can't read the information. Killing the procedure.");
296        // we got a remote exception, but we can't describe it
297        ee = new ForeignException(coordName,
298          "Data in abort node is illegally formatted.  ignoring content.");
299      } else {
300
301        data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
302        ee = ForeignException.deserialize(data);
303      }
304    } catch (IOException e) {
305      LOG.warn("Got an error notification for op:" + abortNode
306          + " but we can't read the information. Killing the procedure.");
307      // we got a remote exception, but we can't describe it
308      ee = new ForeignException(coordName, e);
309    } catch (KeeperException e) {
310      coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
311          + zkProc.getAbortZnode(), new IOException(e));
312    } catch (InterruptedException e) {
313      coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
314          + zkProc.getAbortZnode(), new IOException(e));
315      Thread.currentThread().interrupt();
316    }
317    coordinator.abortProcedure(procName, ee);
318  }
319
320  @Override
321  final public void close() throws IOException {
322    zkProc.close();
323  }
324
325  /**
326   * Used in testing
327   */
328  final ZKProcedureUtil getZkProcedureUtil() {
329    return zkProc;
330  }
331}