001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.zookeeper;
020
021import java.io.Closeable;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.concurrent.CountDownLatch;
027import java.util.regex.Matcher;
028import java.util.regex.Pattern;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Abortable;
032import org.apache.hadoop.hbase.AuthUtil;
033import org.apache.hadoop.hbase.ZooKeeperConnectionException;
034import org.apache.hadoop.hbase.security.Superusers;
035import org.apache.hadoop.security.UserGroupInformation;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.zookeeper.KeeperException;
038import org.apache.zookeeper.WatchedEvent;
039import org.apache.zookeeper.Watcher;
040import org.apache.zookeeper.ZooDefs.Ids;
041import org.apache.zookeeper.ZooDefs.Perms;
042import org.apache.zookeeper.data.ACL;
043import org.apache.zookeeper.data.Id;
044import org.apache.zookeeper.data.Stat;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
050 * for each Master, RegionServer, and client process.
051 *
052 * <p>This is the only class that implements {@link Watcher}.  Other internal
053 * classes which need to be notified of ZooKeeper events must register with
054 * the local instance of this watcher via {@link #registerListener}.
055 *
056 * <p>This class also holds and manages the connection to ZooKeeper.  Code to
057 * deal with connection related events and exceptions are handled here.
058 */
059@InterfaceAudience.Private
060public class ZKWatcher implements Watcher, Abortable, Closeable {
061  private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class);
062
063  // Identifier for this watcher (for logging only).  It is made of the prefix
064  // passed on construction and the zookeeper sessionid.
065  private String prefix;
066  private String identifier;
067
068  // zookeeper quorum
069  private String quorum;
070
071  // zookeeper connection
072  private final RecoverableZooKeeper recoverableZooKeeper;
073
074  // abortable in case of zk failure
075  protected Abortable abortable;
076  // Used if abortable is null
077  private boolean aborted = false;
078
079  private final ZNodePaths znodePaths;
080
081  // listeners to be notified
082  private final List<ZKListener> listeners = new CopyOnWriteArrayList<>();
083
084  // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL
085  // negotiation to complete
086  private CountDownLatch saslLatch = new CountDownLatch(1);
087
088  private final Configuration conf;
089
090  /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
091  private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
092
093  /**
094   * Instantiate a ZooKeeper connection and watcher.
095   * @param identifier string that is passed to RecoverableZookeeper to be used as
096   *                   identifier for this instance. Use null for default.
097   * @throws IOException if the connection to ZooKeeper fails
098   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
099   */
100  public ZKWatcher(Configuration conf, String identifier,
101                   Abortable abortable) throws ZooKeeperConnectionException, IOException {
102    this(conf, identifier, abortable, false);
103  }
104
105  /**
106   * Instantiate a ZooKeeper connection and watcher.
107   * @param conf the configuration to use
108   * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
109   *          this instance. Use null for default.
110   * @param abortable Can be null if there is on error there is no host to abort: e.g. client
111   *          context.
112   * @param canCreateBaseZNode true if a base ZNode can be created
113   * @throws IOException if the connection to ZooKeeper fails
114   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
115   */
116  public ZKWatcher(Configuration conf, String identifier,
117                   Abortable abortable, boolean canCreateBaseZNode)
118    throws IOException, ZooKeeperConnectionException {
119    this(conf, identifier, abortable, canCreateBaseZNode, false);
120  }
121
122  /**
123   * Instantiate a ZooKeeper connection and watcher.
124   * @param conf the configuration to use
125   * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
126   *          this instance. Use null for default.
127   * @param abortable Can be null if there is on error there is no host to abort: e.g. client
128   *          context.
129   * @param canCreateBaseZNode true if a base ZNode can be created
130   * @param clientZK whether this watcher is set to access client ZK
131   * @throws IOException if the connection to ZooKeeper fails
132   * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base
133   *           ZNodes
134   */
135  public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
136      boolean canCreateBaseZNode, boolean clientZK)
137      throws IOException, ZooKeeperConnectionException {
138    this.conf = conf;
139    if (clientZK) {
140      String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
141      String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf);
142      if (clientZkQuorumServers != null) {
143        if (clientZkQuorumServers.equals(serverZkQuorumServers)) {
144          // Don't allow same settings to avoid dead loop when master trying
145          // to sync meta information from server ZK to client ZK
146          throw new IllegalArgumentException(
147              "The quorum settings for client ZK should be different from those for server");
148        }
149        this.quorum = clientZkQuorumServers;
150      } else {
151        this.quorum = serverZkQuorumServers;
152      }
153    } else {
154      this.quorum = ZKConfig.getZKQuorumServersString(conf);
155    }
156    this.prefix = identifier;
157    // Identifier will get the sessionid appended later below down when we
158    // handle the syncconnect event.
159    this.identifier = identifier + "0x0";
160    this.abortable = abortable;
161    this.znodePaths = new ZNodePaths(conf);
162    PendingWatcher pendingWatcher = new PendingWatcher();
163    this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
164    pendingWatcher.prepare(this);
165    if (canCreateBaseZNode) {
166      try {
167        createBaseZNodes();
168      } catch (ZooKeeperConnectionException zce) {
169        try {
170          this.recoverableZooKeeper.close();
171        } catch (InterruptedException ie) {
172          LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper);
173          Thread.currentThread().interrupt();
174        }
175        throw zce;
176      }
177    }
178  }
179
180  private void createBaseZNodes() throws ZooKeeperConnectionException {
181    try {
182      // Create all the necessary "directories" of znodes
183      ZKUtil.createWithParents(this, znodePaths.baseZNode);
184      ZKUtil.createAndFailSilent(this, znodePaths.rsZNode);
185      ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode);
186      ZKUtil.createAndFailSilent(this, znodePaths.tableZNode);
187      ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode);
188      ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode);
189      ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode);
190    } catch (KeeperException e) {
191      throw new ZooKeeperConnectionException(
192          prefix("Unexpected KeeperException creating base node"), e);
193    }
194  }
195
196  /**
197   * On master start, we check the znode ACLs under the root directory and set the ACLs properly
198   * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed
199   * so that the existing znodes created with open permissions are now changed with restrictive
200   * perms.
201   */
202  public void checkAndSetZNodeAcls() {
203    if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
204      LOG.info("not a secure deployment, proceeding");
205      return;
206    }
207
208    // Check the base znodes permission first. Only do the recursion if base znode's perms are not
209    // correct.
210    try {
211      List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat());
212
213      if (!isBaseZnodeAclSetup(actualAcls)) {
214        LOG.info("setting znode ACLs");
215        setZnodeAclsRecursive(znodePaths.baseZNode);
216      }
217    } catch(KeeperException.NoNodeException nne) {
218      return;
219    } catch(InterruptedException ie) {
220      interruptedExceptionNoThrow(ie, false);
221    } catch (IOException|KeeperException e) {
222      LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
223    }
224  }
225
226  /**
227   * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs
228   * will be set last in case the master fails in between.
229   * @param znode the ZNode to set the permissions for
230   */
231  private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
232    List<String> children = recoverableZooKeeper.getChildren(znode, false);
233
234    for (String child : children) {
235      setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child));
236    }
237    List<ACL> acls = ZKUtil.createACL(this, znode, true);
238    LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls);
239    recoverableZooKeeper.setAcl(znode, acls, -1);
240  }
241
242  /**
243   * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup.
244   * @param acls acls from zookeeper
245   * @return whether ACLs are set for the base znode
246   * @throws IOException if getting the current user fails
247   */
248  private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
249    if (LOG.isDebugEnabled()) {
250      LOG.debug("Checking znode ACLs");
251    }
252    String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
253    // Check whether ACL set for all superusers
254    if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
255      return false;
256    }
257
258    // this assumes that current authenticated user is the same as zookeeper client user
259    // configured via JAAS
260    String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
261
262    if (acls.isEmpty()) {
263      if (LOG.isDebugEnabled()) {
264        LOG.debug("ACL is empty");
265      }
266      return false;
267    }
268
269    for (ACL acl : acls) {
270      int perms = acl.getPerms();
271      Id id = acl.getId();
272      // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser
273      // and one for the hbase user
274      if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
275        if (perms != Perms.READ) {
276          if (LOG.isDebugEnabled()) {
277            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
278              id, perms, Perms.READ));
279          }
280          return false;
281        }
282      } else if (superUsers != null && isSuperUserId(superUsers, id)) {
283        if (perms != Perms.ALL) {
284          if (LOG.isDebugEnabled()) {
285            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
286              id, perms, Perms.ALL));
287          }
288          return false;
289        }
290      } else if ("sasl".equals(id.getScheme())) {
291        String name = id.getId();
292        // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname
293        Matcher match = NAME_PATTERN.matcher(name);
294        if (match.matches()) {
295          name = match.group(1);
296        }
297        if (name.equals(hbaseUser)) {
298          if (perms != Perms.ALL) {
299            if (LOG.isDebugEnabled()) {
300              LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
301                id, perms, Perms.ALL));
302            }
303            return false;
304          }
305        } else {
306          if (LOG.isDebugEnabled()) {
307            LOG.debug("Unexpected shortname in SASL ACL: {}", id);
308          }
309          return false;
310        }
311      } else {
312        if (LOG.isDebugEnabled()) {
313          LOG.debug("unexpected ACL id '{}'", id);
314        }
315        return false;
316      }
317    }
318    return true;
319  }
320
321  /*
322   * Validate whether ACL set for all superusers.
323   */
324  private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
325    for (String user : superUsers) {
326      boolean hasAccess = false;
327      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
328      if (!AuthUtil.isGroupPrincipal(user)) {
329        for (ACL acl : acls) {
330          if (user.equals(acl.getId().getId())) {
331            if (acl.getPerms() == Perms.ALL) {
332              hasAccess = true;
333            } else {
334              if (LOG.isDebugEnabled()) {
335                LOG.debug(String.format(
336                  "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
337                  acl.getId().getId(), acl.getPerms(), Perms.ALL));
338              }
339            }
340            break;
341          }
342        }
343        if (!hasAccess) {
344          return false;
345        }
346      }
347    }
348    return true;
349  }
350
351  /*
352   * Validate whether ACL ID is superuser.
353   */
354  public static boolean isSuperUserId(String[] superUsers, Id id) {
355    for (String user : superUsers) {
356      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
357      if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) {
358        return true;
359      }
360    }
361    return false;
362  }
363
364  @Override
365  public String toString() {
366    return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode;
367  }
368
369  /**
370   * Adds this instance's identifier as a prefix to the passed <code>str</code>
371   * @param str String to amend.
372   * @return A new string with this instance's identifier as prefix: e.g.
373   *         if passed 'hello world', the returned string could be
374   */
375  public String prefix(final String str) {
376    return this.toString() + " " + str;
377  }
378
379  /**
380   * Get the znodes corresponding to the meta replicas from ZK
381   * @return list of znodes
382   * @throws KeeperException if a ZooKeeper operation fails
383   */
384  public List<String> getMetaReplicaNodes() throws KeeperException {
385    List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode);
386    List<String> metaReplicaNodes = new ArrayList<>(2);
387    if (childrenOfBaseNode != null) {
388      String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
389      for (String child : childrenOfBaseNode) {
390        if (child.startsWith(pattern)) {
391          metaReplicaNodes.add(child);
392        }
393      }
394    }
395    return metaReplicaNodes;
396  }
397
398  /**
399   * Register the specified listener to receive ZooKeeper events.
400   * @param listener the listener to register
401   */
402  public void registerListener(ZKListener listener) {
403    listeners.add(listener);
404  }
405
406  /**
407   * Register the specified listener to receive ZooKeeper events and add it as
408   * the first in the list of current listeners.
409   * @param listener the listener to register
410   */
411  public void registerListenerFirst(ZKListener listener) {
412    listeners.add(0, listener);
413  }
414
415  public void unregisterListener(ZKListener listener) {
416    listeners.remove(listener);
417  }
418
419  /**
420   * Clean all existing listeners
421   */
422  public void unregisterAllListeners() {
423    listeners.clear();
424  }
425
426  /**
427   * Get a copy of current registered listeners
428   */
429  public List<ZKListener> getListeners() {
430    return new ArrayList<>(listeners);
431  }
432
433  /**
434   * @return The number of currently registered listeners
435   */
436  public int getNumberOfListeners() {
437    return listeners.size();
438  }
439
440  /**
441   * Get the connection to ZooKeeper.
442   * @return connection reference to zookeeper
443   */
444  public RecoverableZooKeeper getRecoverableZooKeeper() {
445    return recoverableZooKeeper;
446  }
447
448  public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
449    recoverableZooKeeper.reconnectAfterExpiration();
450  }
451
452  /**
453   * Get the quorum address of this instance.
454   * @return quorum string of this zookeeper connection instance
455   */
456  public String getQuorum() {
457    return quorum;
458  }
459
460  /**
461   * Get the znodePaths.
462   * <p>
463   * Mainly used for mocking as mockito can not mock a field access.
464   */
465  public ZNodePaths getZNodePaths() {
466    return znodePaths;
467  }
468
469  /**
470   * Method called from ZooKeeper for events and connection status.
471   * <p>
472   * Valid events are passed along to listeners.  Connection status changes
473   * are dealt with locally.
474   */
475  @Override
476  public void process(WatchedEvent event) {
477    LOG.debug(prefix("Received ZooKeeper Event, " +
478        "type=" + event.getType() + ", " +
479        "state=" + event.getState() + ", " +
480        "path=" + event.getPath()));
481
482    switch(event.getType()) {
483
484      // If event type is NONE, this is a connection status change
485      case None: {
486        connectionEvent(event);
487        break;
488      }
489
490      // Otherwise pass along to the listeners
491
492      case NodeCreated: {
493        for(ZKListener listener : listeners) {
494          listener.nodeCreated(event.getPath());
495        }
496        break;
497      }
498
499      case NodeDeleted: {
500        for(ZKListener listener : listeners) {
501          listener.nodeDeleted(event.getPath());
502        }
503        break;
504      }
505
506      case NodeDataChanged: {
507        for(ZKListener listener : listeners) {
508          listener.nodeDataChanged(event.getPath());
509        }
510        break;
511      }
512
513      case NodeChildrenChanged: {
514        for(ZKListener listener : listeners) {
515          listener.nodeChildrenChanged(event.getPath());
516        }
517        break;
518      }
519      default:
520        throw new IllegalStateException("Received event is not valid: " + event.getState());
521    }
522  }
523
524  // Connection management
525
526  /**
527   * Called when there is a connection-related event via the Watcher callback.
528   * <p>
529   * If Disconnected or Expired, this should shutdown the cluster. But, since
530   * we send a KeeperException.SessionExpiredException along with the abort
531   * call, it's possible for the Abortable to catch it and try to create a new
532   * session with ZooKeeper. This is what the client does in HCM.
533   * <p>
534   * @param event the connection-related event
535   */
536  private void connectionEvent(WatchedEvent event) {
537    switch(event.getState()) {
538      case SyncConnected:
539        this.identifier = this.prefix + "-0x" +
540          Long.toHexString(this.recoverableZooKeeper.getSessionId());
541        // Update our identifier.  Otherwise ignore.
542        LOG.debug("{} connected", this.identifier);
543        break;
544
545      // Abort the server if Disconnected or Expired
546      case Disconnected:
547        LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
548        break;
549
550      case Expired:
551        String msg = prefix(this.identifier + " received expired from " +
552          "ZooKeeper, aborting");
553        // TODO: One thought is to add call to ZKListener so say,
554        // ZKNodeTracker can zero out its data values.
555        if (this.abortable != null) {
556          this.abortable.abort(msg, new KeeperException.SessionExpiredException());
557        }
558        break;
559
560      case ConnectedReadOnly:
561      case SaslAuthenticated:
562      case AuthFailed:
563        break;
564
565      default:
566        throw new IllegalStateException("Received event is not valid: " + event.getState());
567    }
568  }
569
570  /**
571   * Forces a synchronization of this ZooKeeper client connection.
572   * <p>
573   * Executing this method before running other methods will ensure that the
574   * subsequent operations are up-to-date and consistent as of the time that
575   * the sync is complete.
576   * <p>
577   * This is used for compareAndSwap type operations where we need to read the
578   * data of an existing node and delete or transition that node, utilizing the
579   * previously read version and data.  We want to ensure that the version read
580   * is up-to-date from when we begin the operation.
581   */
582  public void sync(String path) throws KeeperException {
583    this.recoverableZooKeeper.sync(path, null, null);
584  }
585
586  /**
587   * Handles KeeperExceptions in client calls.
588   * <p>
589   * This may be temporary but for now this gives one place to deal with these.
590   * <p>
591   * TODO: Currently this method rethrows the exception to let the caller handle
592   * <p>
593   * @param ke the exception to rethrow
594   * @throws KeeperException if a ZooKeeper operation fails
595   */
596  public void keeperException(KeeperException ke) throws KeeperException {
597    LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
598    throw ke;
599  }
600
601  /**
602   * Handles InterruptedExceptions in client calls.
603   * @param ie the InterruptedException instance thrown
604   * @throws KeeperException the exception to throw, transformed from the InterruptedException
605   */
606  public void interruptedException(InterruptedException ie) throws KeeperException {
607    interruptedExceptionNoThrow(ie, true);
608    // Throw a system error exception to let upper level handle it
609    KeeperException keeperException = new KeeperException.SystemErrorException();
610    keeperException.initCause(ie);
611    throw keeperException;
612  }
613
614  /**
615   * Log the InterruptedException and interrupt current thread
616   * @param ie The IterruptedException to log
617   * @param throwLater Whether we will throw the exception latter
618   */
619  public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) {
620    LOG.debug(prefix("Received InterruptedException, will interrupt current thread"
621        + (throwLater ? " and rethrow a SystemErrorException" : "")),
622      ie);
623    // At least preserve interrupt.
624    Thread.currentThread().interrupt();
625  }
626
627  /**
628   * Close the connection to ZooKeeper.
629   *
630   */
631  @Override
632  public void close() {
633    try {
634      recoverableZooKeeper.close();
635    } catch (InterruptedException e) {
636      Thread.currentThread().interrupt();
637    }
638  }
639
640  public Configuration getConfiguration() {
641    return conf;
642  }
643
644  @Override
645  public void abort(String why, Throwable e) {
646    if (this.abortable != null) {
647      this.abortable.abort(why, e);
648    } else {
649      this.aborted = true;
650    }
651  }
652
653  @Override
654  public boolean isAborted() {
655    return this.abortable == null? this.aborted: this.abortable.isAborted();
656  }
657}