001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.zookeeper;
020
021import java.io.Closeable;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.concurrent.CountDownLatch;
027import java.util.regex.Matcher;
028import java.util.regex.Pattern;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Abortable;
032import org.apache.hadoop.hbase.AuthUtil;
033import org.apache.hadoop.hbase.ZooKeeperConnectionException;
034import org.apache.hadoop.hbase.security.Superusers;
035import org.apache.hadoop.security.UserGroupInformation;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.zookeeper.KeeperException;
038import org.apache.zookeeper.WatchedEvent;
039import org.apache.zookeeper.Watcher;
040import org.apache.zookeeper.ZooDefs.Ids;
041import org.apache.zookeeper.ZooDefs.Perms;
042import org.apache.zookeeper.data.ACL;
043import org.apache.zookeeper.data.Id;
044import org.apache.zookeeper.data.Stat;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
050 * for each Master, RegionServer, and client process.
051 *
052 * <p>This is the only class that implements {@link Watcher}.  Other internal
053 * classes which need to be notified of ZooKeeper events must register with
054 * the local instance of this watcher via {@link #registerListener}.
055 *
056 * <p>This class also holds and manages the connection to ZooKeeper.  Code to
057 * deal with connection related events and exceptions are handled here.
058 */
059@InterfaceAudience.Private
060public class ZKWatcher implements Watcher, Abortable, Closeable {
061  private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class);
062
063  // Identifier for this watcher (for logging only).  It is made of the prefix
064  // passed on construction and the zookeeper sessionid.
065  private String prefix;
066  private String identifier;
067
068  // zookeeper quorum
069  private String quorum;
070
071  // zookeeper connection
072  private final RecoverableZooKeeper recoverableZooKeeper;
073
074  // abortable in case of zk failure
075  protected Abortable abortable;
076  // Used if abortable is null
077  private boolean aborted = false;
078
079  public final ZNodePaths znodePaths;
080
081  // listeners to be notified
082  private final List<ZKListener> listeners = new CopyOnWriteArrayList<>();
083
084  // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL
085  // negotiation to complete
086  public CountDownLatch saslLatch = new CountDownLatch(1);
087
088  private final Configuration conf;
089
090  /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
091  private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
092
093  /**
094   * Instantiate a ZooKeeper connection and watcher.
095   * @param identifier string that is passed to RecoverableZookeeper to be used as
096   *                   identifier for this instance. Use null for default.
097   * @throws IOException if the connection to ZooKeeper fails
098   * @throws ZooKeeperConnectionException
099   */
100  public ZKWatcher(Configuration conf, String identifier,
101                   Abortable abortable) throws ZooKeeperConnectionException, IOException {
102    this(conf, identifier, abortable, false);
103  }
104
105  /**
106   * Instantiate a ZooKeeper connection and watcher.
107   * @param conf the configuration to use
108   * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
109   *          this instance. Use null for default.
110   * @param abortable Can be null if there is on error there is no host to abort: e.g. client
111   *          context.
112   * @param canCreateBaseZNode true if a base ZNode can be created
113   * @throws IOException if the connection to ZooKeeper fails
114   * @throws ZooKeeperConnectionException
115   */
116  public ZKWatcher(Configuration conf, String identifier,
117                   Abortable abortable, boolean canCreateBaseZNode)
118  throws IOException, ZooKeeperConnectionException {
119    this.conf = conf;
120    this.quorum = ZKConfig.getZKQuorumServersString(conf);
121    this.prefix = identifier;
122    // Identifier will get the sessionid appended later below down when we
123    // handle the syncconnect event.
124    this.identifier = identifier + "0x0";
125    this.abortable = abortable;
126    this.znodePaths = new ZNodePaths(conf);
127    PendingWatcher pendingWatcher = new PendingWatcher();
128    this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
129    pendingWatcher.prepare(this);
130    if (canCreateBaseZNode) {
131      try {
132        createBaseZNodes();
133      } catch (ZooKeeperConnectionException zce) {
134        try {
135          this.recoverableZooKeeper.close();
136        } catch (InterruptedException ie) {
137          LOG.debug("Encountered InterruptedException when closing " + this.recoverableZooKeeper);
138          Thread.currentThread().interrupt();
139        }
140        throw zce;
141      }
142    }
143  }
144
145  private void createBaseZNodes() throws ZooKeeperConnectionException {
146    try {
147      // Create all the necessary "directories" of znodes
148      ZKUtil.createWithParents(this, znodePaths.baseZNode);
149      ZKUtil.createAndFailSilent(this, znodePaths.rsZNode);
150      ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode);
151      ZKUtil.createAndFailSilent(this, znodePaths.tableZNode);
152      ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode);
153      ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode);
154      ZKUtil.createAndFailSilent(this, znodePaths.tableLockZNode);
155      ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode);
156    } catch (KeeperException e) {
157      throw new ZooKeeperConnectionException(
158          prefix("Unexpected KeeperException creating base node"), e);
159    }
160  }
161
162  /**
163   * On master start, we check the znode ACLs under the root directory and set the ACLs properly
164   * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed
165   * so that the existing znodes created with open permissions are now changed with restrictive
166   * perms.
167   */
168  public void checkAndSetZNodeAcls() {
169    if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
170      LOG.info("not a secure deployment, proceeding");
171      return;
172    }
173
174    // Check the base znodes permission first. Only do the recursion if base znode's perms are not
175    // correct.
176    try {
177      List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat());
178
179      if (!isBaseZnodeAclSetup(actualAcls)) {
180        LOG.info("setting znode ACLs");
181        setZnodeAclsRecursive(znodePaths.baseZNode);
182      }
183    } catch(KeeperException.NoNodeException nne) {
184      return;
185    } catch(InterruptedException ie) {
186      interruptedExceptionNoThrow(ie, false);
187    } catch (IOException|KeeperException e) {
188      LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
189    }
190  }
191
192  /**
193   * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs
194   * will be set last in case the master fails in between.
195   * @param znode the ZNode to set the permissions for
196   */
197  private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
198    List<String> children = recoverableZooKeeper.getChildren(znode, false);
199
200    for (String child : children) {
201      setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child));
202    }
203    List<ACL> acls = ZKUtil.createACL(this, znode, true);
204    LOG.info("Setting ACLs for znode:" + znode + " , acl:" + acls);
205    recoverableZooKeeper.setAcl(znode, acls, -1);
206  }
207
208  /**
209   * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup.
210   * @param acls acls from zookeeper
211   * @return whether ACLs are set for the base znode
212   * @throws IOException if getting the current user fails
213   */
214  private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
215    if (LOG.isDebugEnabled()) {
216      LOG.debug("Checking znode ACLs");
217    }
218    String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
219    // Check whether ACL set for all superusers
220    if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
221      return false;
222    }
223
224    // this assumes that current authenticated user is the same as zookeeper client user
225    // configured via JAAS
226    String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
227
228    if (acls.isEmpty()) {
229      if (LOG.isDebugEnabled()) {
230        LOG.debug("ACL is empty");
231      }
232      return false;
233    }
234
235    for (ACL acl : acls) {
236      int perms = acl.getPerms();
237      Id id = acl.getId();
238      // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser
239      // and one for the hbase user
240      if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
241        if (perms != Perms.READ) {
242          if (LOG.isDebugEnabled()) {
243            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
244              id, perms, Perms.READ));
245          }
246          return false;
247        }
248      } else if (superUsers != null && isSuperUserId(superUsers, id)) {
249        if (perms != Perms.ALL) {
250          if (LOG.isDebugEnabled()) {
251            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
252              id, perms, Perms.ALL));
253          }
254          return false;
255        }
256      } else if ("sasl".equals(id.getScheme())) {
257        String name = id.getId();
258        // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname
259        Matcher match = NAME_PATTERN.matcher(name);
260        if (match.matches()) {
261          name = match.group(1);
262        }
263        if (name.equals(hbaseUser)) {
264          if (perms != Perms.ALL) {
265            if (LOG.isDebugEnabled()) {
266              LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
267                id, perms, Perms.ALL));
268            }
269            return false;
270          }
271        } else {
272          if (LOG.isDebugEnabled()) {
273            LOG.debug("Unexpected shortname in SASL ACL: " + id);
274          }
275          return false;
276        }
277      } else {
278        if (LOG.isDebugEnabled()) {
279          LOG.debug("unexpected ACL id '" + id + "'");
280        }
281        return false;
282      }
283    }
284    return true;
285  }
286
287  /*
288   * Validate whether ACL set for all superusers.
289   */
290  private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
291    for (String user : superUsers) {
292      boolean hasAccess = false;
293      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
294      if (!AuthUtil.isGroupPrincipal(user)) {
295        for (ACL acl : acls) {
296          if (user.equals(acl.getId().getId())) {
297            if (acl.getPerms() == Perms.ALL) {
298              hasAccess = true;
299            } else {
300              if (LOG.isDebugEnabled()) {
301                LOG.debug(String.format(
302                  "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
303                  acl.getId().getId(), acl.getPerms(), Perms.ALL));
304              }
305            }
306            break;
307          }
308        }
309        if (!hasAccess) {
310          return false;
311        }
312      }
313    }
314    return true;
315  }
316
317  /*
318   * Validate whether ACL ID is superuser.
319   */
320  public static boolean isSuperUserId(String[] superUsers, Id id) {
321    for (String user : superUsers) {
322      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
323      if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) {
324        return true;
325      }
326    }
327    return false;
328  }
329
330  @Override
331  public String toString() {
332    return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode;
333  }
334
335  /**
336   * Adds this instance's identifier as a prefix to the passed <code>str</code>
337   * @param str String to amend.
338   * @return A new string with this instance's identifier as prefix: e.g.
339   *         if passed 'hello world', the returned string could be
340   */
341  public String prefix(final String str) {
342    return this.toString() + " " + str;
343  }
344
345  /**
346   * Get the znodes corresponding to the meta replicas from ZK
347   * @return list of znodes
348   * @throws KeeperException if a ZooKeeper operation fails
349   */
350  public List<String> getMetaReplicaNodes() throws KeeperException {
351    List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode);
352    List<String> metaReplicaNodes = new ArrayList<>(2);
353    if (childrenOfBaseNode != null) {
354      String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
355      for (String child : childrenOfBaseNode) {
356        if (child.startsWith(pattern)) {
357          metaReplicaNodes.add(child);
358        }
359      }
360    }
361    return metaReplicaNodes;
362  }
363
364  /**
365   * Register the specified listener to receive ZooKeeper events.
366   * @param listener the listener to register
367   */
368  public void registerListener(ZKListener listener) {
369    listeners.add(listener);
370  }
371
372  /**
373   * Register the specified listener to receive ZooKeeper events and add it as
374   * the first in the list of current listeners.
375   * @param listener the listener to register
376   */
377  public void registerListenerFirst(ZKListener listener) {
378    listeners.add(0, listener);
379  }
380
381  public void unregisterListener(ZKListener listener) {
382    listeners.remove(listener);
383  }
384
385  /**
386   * Clean all existing listeners
387   */
388  public void unregisterAllListeners() {
389    listeners.clear();
390  }
391
392  /**
393   * Get a copy of current registered listeners
394   */
395  public List<ZKListener> getListeners() {
396    return new ArrayList<>(listeners);
397  }
398
399  /**
400   * @return The number of currently registered listeners
401   */
402  public int getNumberOfListeners() {
403    return listeners.size();
404  }
405
406  /**
407   * Get the connection to ZooKeeper.
408   * @return connection reference to zookeeper
409   */
410  public RecoverableZooKeeper getRecoverableZooKeeper() {
411    return recoverableZooKeeper;
412  }
413
414  public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
415    recoverableZooKeeper.reconnectAfterExpiration();
416  }
417
418  /**
419   * Get the quorum address of this instance.
420   * @return quorum string of this zookeeper connection instance
421   */
422  public String getQuorum() {
423    return quorum;
424  }
425
426  /**
427   * Get the znodePaths.
428   * <p>
429   * Mainly used for mocking as mockito can not mock a field access.
430   */
431  public ZNodePaths getZNodePaths() {
432    return znodePaths;
433  }
434
435  /**
436   * Method called from ZooKeeper for events and connection status.
437   * <p>
438   * Valid events are passed along to listeners.  Connection status changes
439   * are dealt with locally.
440   */
441  @Override
442  public void process(WatchedEvent event) {
443    LOG.debug(prefix("Received ZooKeeper Event, " +
444        "type=" + event.getType() + ", " +
445        "state=" + event.getState() + ", " +
446        "path=" + event.getPath()));
447
448    switch(event.getType()) {
449
450      // If event type is NONE, this is a connection status change
451      case None: {
452        connectionEvent(event);
453        break;
454      }
455
456      // Otherwise pass along to the listeners
457
458      case NodeCreated: {
459        for(ZKListener listener : listeners) {
460          listener.nodeCreated(event.getPath());
461        }
462        break;
463      }
464
465      case NodeDeleted: {
466        for(ZKListener listener : listeners) {
467          listener.nodeDeleted(event.getPath());
468        }
469        break;
470      }
471
472      case NodeDataChanged: {
473        for(ZKListener listener : listeners) {
474          listener.nodeDataChanged(event.getPath());
475        }
476        break;
477      }
478
479      case NodeChildrenChanged: {
480        for(ZKListener listener : listeners) {
481          listener.nodeChildrenChanged(event.getPath());
482        }
483        break;
484      }
485    }
486  }
487
488  // Connection management
489
490  /**
491   * Called when there is a connection-related event via the Watcher callback.
492   * <p>
493   * If Disconnected or Expired, this should shutdown the cluster. But, since
494   * we send a KeeperException.SessionExpiredException along with the abort
495   * call, it's possible for the Abortable to catch it and try to create a new
496   * session with ZooKeeper. This is what the client does in HCM.
497   * <p>
498   * @param event the connection-related event
499   */
500  private void connectionEvent(WatchedEvent event) {
501    switch(event.getState()) {
502      case SyncConnected:
503        this.identifier = this.prefix + "-0x" +
504          Long.toHexString(this.recoverableZooKeeper.getSessionId());
505        // Update our identifier.  Otherwise ignore.
506        LOG.debug(this.identifier + " connected");
507        break;
508
509      // Abort the server if Disconnected or Expired
510      case Disconnected:
511        LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
512        break;
513
514      case Expired:
515        String msg = prefix(this.identifier + " received expired from " +
516          "ZooKeeper, aborting");
517        // TODO: One thought is to add call to ZKListener so say,
518        // ZKNodeTracker can zero out its data values.
519        if (this.abortable != null) {
520          this.abortable.abort(msg, new KeeperException.SessionExpiredException());
521        }
522        break;
523
524      case ConnectedReadOnly:
525      case SaslAuthenticated:
526      case AuthFailed:
527        break;
528
529      default:
530        throw new IllegalStateException("Received event is not valid: " + event.getState());
531    }
532  }
533
534  /**
535   * Forces a synchronization of this ZooKeeper client connection.
536   * <p>
537   * Executing this method before running other methods will ensure that the
538   * subsequent operations are up-to-date and consistent as of the time that
539   * the sync is complete.
540   * <p>
541   * This is used for compareAndSwap type operations where we need to read the
542   * data of an existing node and delete or transition that node, utilizing the
543   * previously read version and data.  We want to ensure that the version read
544   * is up-to-date from when we begin the operation.
545   */
546  public void sync(String path) throws KeeperException {
547    this.recoverableZooKeeper.sync(path, null, null);
548  }
549
550  /**
551   * Handles KeeperExceptions in client calls.
552   * <p>
553   * This may be temporary but for now this gives one place to deal with these.
554   * <p>
555   * TODO: Currently this method rethrows the exception to let the caller handle
556   * <p>
557   * @param ke the exception to rethrow
558   * @throws KeeperException if a ZooKeeper operation fails
559   */
560  public void keeperException(KeeperException ke) throws KeeperException {
561    LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
562    throw ke;
563  }
564
565  /**
566   * Handles InterruptedExceptions in client calls.
567   * @param ie the InterruptedException instance thrown
568   * @throws KeeperException the exception to throw, transformed from the InterruptedException
569   */
570  public void interruptedException(InterruptedException ie) throws KeeperException {
571    interruptedExceptionNoThrow(ie, true);
572    // Throw a system error exception to let upper level handle it
573    throw new KeeperException.SystemErrorException();
574  }
575
576  /**
577   * Log the InterruptedException and interrupt current thread
578   * @param ie The IterruptedException to log
579   * @param throwLater Whether we will throw the exception latter
580   */
581  public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) {
582    LOG.debug(prefix("Received InterruptedException, will interrupt current thread"
583        + (throwLater ? " and rethrow a SystemErrorException" : "")),
584      ie);
585    // At least preserve interrupt.
586    Thread.currentThread().interrupt();
587  }
588
589  /**
590   * Close the connection to ZooKeeper.
591   *
592   */
593  @Override
594  public void close() {
595    try {
596      recoverableZooKeeper.close();
597    } catch (InterruptedException e) {
598      Thread.currentThread().interrupt();
599    }
600  }
601
602  public Configuration getConfiguration() {
603    return conf;
604  }
605
606  @Override
607  public void abort(String why, Throwable e) {
608    if (this.abortable != null) {
609      this.abortable.abort(why, e);
610    } else {
611      this.aborted = true;
612    }
613  }
614
615  @Override
616  public boolean isAborted() {
617    return this.abortable == null? this.aborted: this.abortable.isAborted();
618  }
619}