001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.nio.charset.StandardCharsets;
023import java.util.Arrays;
024import java.util.List;
025import org.apache.hadoop.hbase.errorhandling.ForeignException;
026import org.apache.hadoop.hbase.zookeeper.ZKUtil;
027import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
028import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.zookeeper.KeeperException;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
035
036/**
037 * ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator}
038 */
039@InterfaceAudience.Private
040public class ZKProcedureCoordinator implements ProcedureCoordinatorRpcs {
041  private static final Logger LOG = LoggerFactory.getLogger(ZKProcedureCoordinator.class);
042  private ZKProcedureUtil zkProc = null;
043  protected ProcedureCoordinator coordinator = null; // if started this should be non-null
044
045  ZKWatcher watcher;
046  String procedureType;
047  String coordName;
048
049  /**
050   * @param watcher        zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()}
051   * @param procedureClass procedure type name is a category for when there are multiple kinds of
052   *                       procedures.-- this becomes a znode so be aware of the naming restrictions
053   * @param coordName      name of the node running the coordinator
054   * @throws KeeperException if an unexpected zk error occurs
055   */
056  public ZKProcedureCoordinator(ZKWatcher watcher, String procedureClass, String coordName) {
057    this.watcher = watcher;
058    this.procedureType = procedureClass;
059    this.coordName = coordName;
060  }
061
062  /**
063   * The "acquire" phase. The coordinator creates a new procType/acquired/ znode dir. If znodes
064   * appear, first acquire to relevant listener or sets watch waiting for notification of the
065   * acquire node
066   * @param proc      the Procedure
067   * @param info      data to be stored in the acquire node
068   * @param nodeNames children of the acquire phase
069   * @throws IOException if any failure occurs.
070   */
071  @Override
072  final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
073    throws IOException, IllegalArgumentException {
074    String procName = proc.getName();
075    // start watching for the abort node
076    String abortNode = zkProc.getAbortZNode(procName);
077    try {
078      // check to see if the abort node already exists
079      if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
080        abort(abortNode);
081      }
082      // If we get an abort node watch triggered here, we'll go complete creating the acquired
083      // znode but then handle the acquire znode and bail out
084    } catch (KeeperException e) {
085      String msg = "Failed while watching abort node:" + abortNode;
086      LOG.error(msg, e);
087      throw new IOException(msg, e);
088    }
089
090    // create the acquire barrier
091    String acquire = zkProc.getAcquiredBarrierNode(procName);
092    LOG.debug("Creating acquire znode:" + acquire);
093    try {
094      // notify all the procedure listeners to look for the acquire node
095      byte[] data = ProtobufUtil.prependPBMagic(info);
096      ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
097      // loop through all the children of the acquire phase and watch for them
098      for (String node : nodeNames) {
099        String znode = ZNodePaths.joinZNode(acquire, node);
100        LOG.debug("Watching for acquire node:" + znode);
101        if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
102          coordinator.memberAcquiredBarrier(procName, node);
103        }
104      }
105    } catch (KeeperException e) {
106      String msg = "Failed while creating acquire node:" + acquire;
107      LOG.error(msg, e);
108      throw new IOException(msg, e);
109    }
110  }
111
112  @Override
113  public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
114    String procName = proc.getName();
115    String reachedNode = zkProc.getReachedBarrierNode(procName);
116    LOG.debug("Creating reached barrier zk node:" + reachedNode);
117    try {
118      // create the reached znode and watch for the reached znodes
119      ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
120      // loop through all the children of the acquire phase and watch for them
121      for (String node : nodeNames) {
122        String znode = ZNodePaths.joinZNode(reachedNode, node);
123        if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
124          byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode);
125          // ProtobufUtil.isPBMagicPrefix will check null
126          if (dataFromMember != null && dataFromMember.length > 0) {
127            if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
128              String msg =
129                "Failed to get data from finished node or data is illegally formatted: " + znode;
130              LOG.error(msg);
131              throw new IOException(msg);
132            } else {
133              dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
134                dataFromMember.length);
135              coordinator.memberFinishedBarrier(procName, node, dataFromMember);
136            }
137          } else {
138            coordinator.memberFinishedBarrier(procName, node, dataFromMember);
139          }
140        }
141      }
142    } catch (KeeperException e) {
143      String msg = "Failed while creating reached node:" + reachedNode;
144      LOG.error(msg, e);
145      throw new IOException(msg, e);
146    } catch (InterruptedException e) {
147      String msg = "Interrupted while creating reached node:" + reachedNode;
148      LOG.error(msg, e);
149      throw new InterruptedIOException(msg);
150    }
151  }
152
153  /**
154   * Delete znodes that are no longer in use.
155   */
156  @Override
157  final public void resetMembers(Procedure proc) throws IOException {
158    String procName = proc.getName();
159    boolean stillGettingNotifications = false;
160    do {
161      try {
162        LOG.debug("Attempting to clean out zk node for op:" + procName);
163        zkProc.clearZNodes(procName);
164        stillGettingNotifications = false;
165      } catch (KeeperException.NotEmptyException e) {
166        // recursive delete isn't transactional (yet) so we need to deal with cases where we get
167        // children trickling in
168        stillGettingNotifications = true;
169      } catch (KeeperException e) {
170        String msg = "Failed to complete reset procedure " + procName;
171        LOG.error(msg, e);
172        throw new IOException(msg, e);
173      }
174    } while (stillGettingNotifications);
175  }
176
177  /**
178   * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
179   * @return true if succeed, false if encountered initialization errors.
180   */
181  @Override
182  final public boolean start(final ProcedureCoordinator coordinator) {
183    if (this.coordinator != null) {
184      throw new IllegalStateException(
185        "ZKProcedureCoordinator already started and already has listener installed");
186    }
187    this.coordinator = coordinator;
188
189    try {
190      this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
191        @Override
192        public void nodeCreated(String path) {
193          if (!isInProcedurePath(path)) return;
194          LOG.debug("Node created: " + path);
195          logZKTree(this.baseZNode);
196          if (isAcquiredPathNode(path)) {
197            // node wasn't present when we created the watch so zk event triggers acquire
198            coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
199              ZKUtil.getNodeName(path));
200          } else if (isReachedPathNode(path)) {
201            // node was absent when we created the watch so zk event triggers the finished barrier.
202
203            // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order.
204            String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
205            String member = ZKUtil.getNodeName(path);
206            // get the data from the procedure member
207            try {
208              byte[] dataFromMember = ZKUtil.getData(watcher, path);
209              // ProtobufUtil.isPBMagicPrefix will check null
210              if (dataFromMember != null && dataFromMember.length > 0) {
211                if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
212                  ForeignException ee = new ForeignException(coordName,
213                    "Failed to get data from finished node or data is illegally formatted:" + path);
214                  coordinator.abortProcedure(procName, ee);
215                } else {
216                  dataFromMember = Arrays.copyOfRange(dataFromMember,
217                    ProtobufUtil.lengthOfPBMagic(), dataFromMember.length);
218                  LOG.debug("Finished data from procedure '{}' member '{}': {}", procName, member,
219                    new String(dataFromMember, StandardCharsets.UTF_8));
220                  coordinator.memberFinishedBarrier(procName, member, dataFromMember);
221                }
222              } else {
223                coordinator.memberFinishedBarrier(procName, member, dataFromMember);
224              }
225            } catch (KeeperException e) {
226              ForeignException ee = new ForeignException(coordName, e);
227              coordinator.abortProcedure(procName, ee);
228            } catch (InterruptedException e) {
229              ForeignException ee = new ForeignException(coordName, e);
230              coordinator.abortProcedure(procName, ee);
231            }
232          } else if (isAbortPathNode(path)) {
233            abort(path);
234          } else {
235            LOG.debug("Ignoring created notification for node:" + path);
236          }
237        }
238      };
239      zkProc.clearChildZNodes();
240    } catch (KeeperException e) {
241      LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
242      return false;
243    }
244
245    LOG.debug("Starting controller for procedure member=" + coordName);
246    return true;
247  }
248
249  /**
250   * This is the abort message being sent by the coordinator to member TODO this code isn't actually
251   * used but can be used to issue a cancellation from the coordinator.
252   */
253  @Override
254  final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
255    String procName = proc.getName();
256    LOG.debug("Aborting procedure '" + procName + "' in zk");
257    String procAbortNode = zkProc.getAbortZNode(procName);
258    try {
259      LOG.debug("Creating abort znode:" + procAbortNode);
260      String source = (ee.getSource() == null) ? coordName : ee.getSource();
261      byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
262      // first create the znode for the procedure
263      ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
264      LOG.debug("Finished creating abort node:" + procAbortNode);
265    } catch (KeeperException e) {
266      // possible that we get this error for the procedure if we already reset the zk state, but in
267      // that case we should still get an error for that procedure anyways
268      zkProc.logZKTree(zkProc.baseZNode);
269      coordinator.rpcConnectionFailure(
270        "Failed to post zk node:" + procAbortNode + " to abort procedure '" + procName + "'",
271        new IOException(e));
272    }
273  }
274
275  /**
276   * Receive a notification and propagate it to the local coordinator
277   * @param abortNode full znode path to the failed procedure information
278   */
279  protected void abort(String abortNode) {
280    String procName = ZKUtil.getNodeName(abortNode);
281    ForeignException ee = null;
282    try {
283      byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
284      if (data == null || data.length == 0) {
285        // ignore
286        return;
287      } else if (!ProtobufUtil.isPBMagicPrefix(data)) {
288        LOG.warn("Got an error notification for op:" + abortNode
289          + " but we can't read the information. Killing the procedure.");
290        // we got a remote exception, but we can't describe it
291        ee = new ForeignException(coordName,
292          "Data in abort node is illegally formatted.  ignoring content.");
293      } else {
294
295        data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
296        ee = ForeignException.deserialize(data);
297      }
298    } catch (IOException e) {
299      LOG.warn("Got an error notification for op:" + abortNode
300        + " but we can't read the information. Killing the procedure.");
301      // we got a remote exception, but we can't describe it
302      ee = new ForeignException(coordName, e);
303    } catch (KeeperException e) {
304      coordinator.rpcConnectionFailure(
305        "Failed to get data for abort node:" + abortNode + zkProc.getAbortZnode(),
306        new IOException(e));
307    } catch (InterruptedException e) {
308      coordinator.rpcConnectionFailure(
309        "Failed to get data for abort node:" + abortNode + zkProc.getAbortZnode(),
310        new IOException(e));
311      Thread.currentThread().interrupt();
312    }
313    coordinator.abortProcedure(procName, ee);
314  }
315
316  @Override
317  final public void close() throws IOException {
318    zkProc.close();
319  }
320
321  /**
322   * Used in testing
323   */
324  final ZKProcedureUtil getZkProcedureUtil() {
325    return zkProc;
326  }
327}