001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.zookeeper;
020
021import java.io.Closeable;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.concurrent.CountDownLatch;
027import java.util.regex.Matcher;
028import java.util.regex.Pattern;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Abortable;
032import org.apache.hadoop.hbase.AuthUtil;
033import org.apache.hadoop.hbase.ZooKeeperConnectionException;
034import org.apache.hadoop.hbase.security.Superusers;
035import org.apache.hadoop.security.UserGroupInformation;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.zookeeper.KeeperException;
038import org.apache.zookeeper.WatchedEvent;
039import org.apache.zookeeper.Watcher;
040import org.apache.zookeeper.ZooDefs.Ids;
041import org.apache.zookeeper.ZooDefs.Perms;
042import org.apache.zookeeper.data.ACL;
043import org.apache.zookeeper.data.Id;
044import org.apache.zookeeper.data.Stat;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
050 * for each Master, RegionServer, and client process.
051 *
052 * <p>This is the only class that implements {@link Watcher}.  Other internal
053 * classes which need to be notified of ZooKeeper events must register with
054 * the local instance of this watcher via {@link #registerListener}.
055 *
056 * <p>This class also holds and manages the connection to ZooKeeper.  Code to
057 * deal with connection related events and exceptions are handled here.
058 */
059@InterfaceAudience.Private
060public class ZKWatcher implements Watcher, Abortable, Closeable {
061  private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class);
062
063  // Identifier for this watcher (for logging only).  It is made of the prefix
064  // passed on construction and the zookeeper sessionid.
065  private String prefix;
066  private String identifier;
067
068  // zookeeper quorum
069  private String quorum;
070
071  // zookeeper connection
072  private final RecoverableZooKeeper recoverableZooKeeper;
073
074  // abortable in case of zk failure
075  protected Abortable abortable;
076  // Used if abortable is null
077  private boolean aborted = false;
078
079  private final ZNodePaths znodePaths;
080
081  // listeners to be notified
082  private final List<ZKListener> listeners = new CopyOnWriteArrayList<>();
083
084  // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL
085  // negotiation to complete
086  private CountDownLatch saslLatch = new CountDownLatch(1);
087
088  private final Configuration conf;
089
090  /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
091  private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
092
093  /**
094   * Instantiate a ZooKeeper connection and watcher.
095   * @param identifier string that is passed to RecoverableZookeeper to be used as
096   *                   identifier for this instance. Use null for default.
097   * @throws IOException if the connection to ZooKeeper fails
098   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
099   */
100  public ZKWatcher(Configuration conf, String identifier,
101                   Abortable abortable) throws ZooKeeperConnectionException, IOException {
102    this(conf, identifier, abortable, false);
103  }
104
105  /**
106   * Instantiate a ZooKeeper connection and watcher.
107   * @param conf the configuration to use
108   * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
109   *          this instance. Use null for default.
110   * @param abortable Can be null if there is on error there is no host to abort: e.g. client
111   *          context.
112   * @param canCreateBaseZNode true if a base ZNode can be created
113   * @throws IOException if the connection to ZooKeeper fails
114   * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper
115   */
116  public ZKWatcher(Configuration conf, String identifier,
117                   Abortable abortable, boolean canCreateBaseZNode)
118    throws IOException, ZooKeeperConnectionException {
119    this(conf, identifier, abortable, canCreateBaseZNode, false);
120  }
121
122  /**
123   * Instantiate a ZooKeeper connection and watcher.
124   * @param conf the configuration to use
125   * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
126   *          this instance. Use null for default.
127   * @param abortable Can be null if there is on error there is no host to abort: e.g. client
128   *          context.
129   * @param canCreateBaseZNode true if a base ZNode can be created
130   * @param clientZK whether this watcher is set to access client ZK
131   * @throws IOException if the connection to ZooKeeper fails
132   * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base
133   *           ZNodes
134   */
135  public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
136      boolean canCreateBaseZNode, boolean clientZK)
137      throws IOException, ZooKeeperConnectionException {
138    this.conf = conf;
139    if (clientZK) {
140      String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
141      String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf);
142      if (clientZkQuorumServers != null) {
143        if (clientZkQuorumServers.equals(serverZkQuorumServers)) {
144          // Don't allow same settings to avoid dead loop when master trying
145          // to sync meta information from server ZK to client ZK
146          throw new IllegalArgumentException(
147              "The quorum settings for client ZK should be different from those for server");
148        }
149        this.quorum = clientZkQuorumServers;
150      } else {
151        this.quorum = serverZkQuorumServers;
152      }
153    } else {
154      this.quorum = ZKConfig.getZKQuorumServersString(conf);
155    }
156    this.prefix = identifier;
157    // Identifier will get the sessionid appended later below down when we
158    // handle the syncconnect event.
159    this.identifier = identifier + "0x0";
160    this.abortable = abortable;
161    this.znodePaths = new ZNodePaths(conf);
162    PendingWatcher pendingWatcher = new PendingWatcher();
163    this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
164    pendingWatcher.prepare(this);
165    if (canCreateBaseZNode) {
166      try {
167        createBaseZNodes();
168      } catch (ZooKeeperConnectionException zce) {
169        try {
170          this.recoverableZooKeeper.close();
171        } catch (InterruptedException ie) {
172          LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper);
173          Thread.currentThread().interrupt();
174        }
175        throw zce;
176      }
177    }
178  }
179
180  private void createBaseZNodes() throws ZooKeeperConnectionException {
181    try {
182      // Create all the necessary "directories" of znodes
183      ZKUtil.createWithParents(this, znodePaths.baseZNode);
184      ZKUtil.createAndFailSilent(this, znodePaths.rsZNode);
185      ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode);
186      ZKUtil.createAndFailSilent(this, znodePaths.tableZNode);
187      ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode);
188      ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode);
189      ZKUtil.createAndFailSilent(this, znodePaths.tableLockZNode);
190      ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode);
191    } catch (KeeperException e) {
192      throw new ZooKeeperConnectionException(
193          prefix("Unexpected KeeperException creating base node"), e);
194    }
195  }
196
197  /**
198   * On master start, we check the znode ACLs under the root directory and set the ACLs properly
199   * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed
200   * so that the existing znodes created with open permissions are now changed with restrictive
201   * perms.
202   */
203  public void checkAndSetZNodeAcls() {
204    if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
205      LOG.info("not a secure deployment, proceeding");
206      return;
207    }
208
209    // Check the base znodes permission first. Only do the recursion if base znode's perms are not
210    // correct.
211    try {
212      List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat());
213
214      if (!isBaseZnodeAclSetup(actualAcls)) {
215        LOG.info("setting znode ACLs");
216        setZnodeAclsRecursive(znodePaths.baseZNode);
217      }
218    } catch(KeeperException.NoNodeException nne) {
219      return;
220    } catch(InterruptedException ie) {
221      interruptedExceptionNoThrow(ie, false);
222    } catch (IOException|KeeperException e) {
223      LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
224    }
225  }
226
227  /**
228   * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs
229   * will be set last in case the master fails in between.
230   * @param znode the ZNode to set the permissions for
231   */
232  private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
233    List<String> children = recoverableZooKeeper.getChildren(znode, false);
234
235    for (String child : children) {
236      setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child));
237    }
238    List<ACL> acls = ZKUtil.createACL(this, znode, true);
239    LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls);
240    recoverableZooKeeper.setAcl(znode, acls, -1);
241  }
242
243  /**
244   * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup.
245   * @param acls acls from zookeeper
246   * @return whether ACLs are set for the base znode
247   * @throws IOException if getting the current user fails
248   */
249  private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
250    if (LOG.isDebugEnabled()) {
251      LOG.debug("Checking znode ACLs");
252    }
253    String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
254    // Check whether ACL set for all superusers
255    if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
256      return false;
257    }
258
259    // this assumes that current authenticated user is the same as zookeeper client user
260    // configured via JAAS
261    String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
262
263    if (acls.isEmpty()) {
264      if (LOG.isDebugEnabled()) {
265        LOG.debug("ACL is empty");
266      }
267      return false;
268    }
269
270    for (ACL acl : acls) {
271      int perms = acl.getPerms();
272      Id id = acl.getId();
273      // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser
274      // and one for the hbase user
275      if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
276        if (perms != Perms.READ) {
277          if (LOG.isDebugEnabled()) {
278            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
279              id, perms, Perms.READ));
280          }
281          return false;
282        }
283      } else if (superUsers != null && isSuperUserId(superUsers, id)) {
284        if (perms != Perms.ALL) {
285          if (LOG.isDebugEnabled()) {
286            LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
287              id, perms, Perms.ALL));
288          }
289          return false;
290        }
291      } else if ("sasl".equals(id.getScheme())) {
292        String name = id.getId();
293        // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname
294        Matcher match = NAME_PATTERN.matcher(name);
295        if (match.matches()) {
296          name = match.group(1);
297        }
298        if (name.equals(hbaseUser)) {
299          if (perms != Perms.ALL) {
300            if (LOG.isDebugEnabled()) {
301              LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
302                id, perms, Perms.ALL));
303            }
304            return false;
305          }
306        } else {
307          if (LOG.isDebugEnabled()) {
308            LOG.debug("Unexpected shortname in SASL ACL: {}", id);
309          }
310          return false;
311        }
312      } else {
313        if (LOG.isDebugEnabled()) {
314          LOG.debug("unexpected ACL id '{}'", id);
315        }
316        return false;
317      }
318    }
319    return true;
320  }
321
322  /*
323   * Validate whether ACL set for all superusers.
324   */
325  private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
326    for (String user : superUsers) {
327      boolean hasAccess = false;
328      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
329      if (!AuthUtil.isGroupPrincipal(user)) {
330        for (ACL acl : acls) {
331          if (user.equals(acl.getId().getId())) {
332            if (acl.getPerms() == Perms.ALL) {
333              hasAccess = true;
334            } else {
335              if (LOG.isDebugEnabled()) {
336                LOG.debug(String.format(
337                  "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
338                  acl.getId().getId(), acl.getPerms(), Perms.ALL));
339              }
340            }
341            break;
342          }
343        }
344        if (!hasAccess) {
345          return false;
346        }
347      }
348    }
349    return true;
350  }
351
352  /*
353   * Validate whether ACL ID is superuser.
354   */
355  public static boolean isSuperUserId(String[] superUsers, Id id) {
356    for (String user : superUsers) {
357      // TODO: Validate super group members also when ZK supports setting node ACL for groups.
358      if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) {
359        return true;
360      }
361    }
362    return false;
363  }
364
365  @Override
366  public String toString() {
367    return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode;
368  }
369
370  /**
371   * Adds this instance's identifier as a prefix to the passed <code>str</code>
372   * @param str String to amend.
373   * @return A new string with this instance's identifier as prefix: e.g.
374   *         if passed 'hello world', the returned string could be
375   */
376  public String prefix(final String str) {
377    return this.toString() + " " + str;
378  }
379
380  /**
381   * Get the znodes corresponding to the meta replicas from ZK
382   * @return list of znodes
383   * @throws KeeperException if a ZooKeeper operation fails
384   */
385  public List<String> getMetaReplicaNodes() throws KeeperException {
386    List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode);
387    List<String> metaReplicaNodes = new ArrayList<>(2);
388    if (childrenOfBaseNode != null) {
389      String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
390      for (String child : childrenOfBaseNode) {
391        if (child.startsWith(pattern)) {
392          metaReplicaNodes.add(child);
393        }
394      }
395    }
396    return metaReplicaNodes;
397  }
398
399  /**
400   * Register the specified listener to receive ZooKeeper events.
401   * @param listener the listener to register
402   */
403  public void registerListener(ZKListener listener) {
404    listeners.add(listener);
405  }
406
407  /**
408   * Register the specified listener to receive ZooKeeper events and add it as
409   * the first in the list of current listeners.
410   * @param listener the listener to register
411   */
412  public void registerListenerFirst(ZKListener listener) {
413    listeners.add(0, listener);
414  }
415
416  public void unregisterListener(ZKListener listener) {
417    listeners.remove(listener);
418  }
419
420  /**
421   * Clean all existing listeners
422   */
423  public void unregisterAllListeners() {
424    listeners.clear();
425  }
426
427  /**
428   * Get a copy of current registered listeners
429   */
430  public List<ZKListener> getListeners() {
431    return new ArrayList<>(listeners);
432  }
433
434  /**
435   * @return The number of currently registered listeners
436   */
437  public int getNumberOfListeners() {
438    return listeners.size();
439  }
440
441  /**
442   * Get the connection to ZooKeeper.
443   * @return connection reference to zookeeper
444   */
445  public RecoverableZooKeeper getRecoverableZooKeeper() {
446    return recoverableZooKeeper;
447  }
448
449  public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
450    recoverableZooKeeper.reconnectAfterExpiration();
451  }
452
453  /**
454   * Get the quorum address of this instance.
455   * @return quorum string of this zookeeper connection instance
456   */
457  public String getQuorum() {
458    return quorum;
459  }
460
461  /**
462   * Get the znodePaths.
463   * <p>
464   * Mainly used for mocking as mockito can not mock a field access.
465   */
466  public ZNodePaths getZNodePaths() {
467    return znodePaths;
468  }
469
470  /**
471   * Method called from ZooKeeper for events and connection status.
472   * <p>
473   * Valid events are passed along to listeners.  Connection status changes
474   * are dealt with locally.
475   */
476  @Override
477  public void process(WatchedEvent event) {
478    LOG.debug(prefix("Received ZooKeeper Event, " +
479        "type=" + event.getType() + ", " +
480        "state=" + event.getState() + ", " +
481        "path=" + event.getPath()));
482
483    switch(event.getType()) {
484
485      // If event type is NONE, this is a connection status change
486      case None: {
487        connectionEvent(event);
488        break;
489      }
490
491      // Otherwise pass along to the listeners
492
493      case NodeCreated: {
494        for(ZKListener listener : listeners) {
495          listener.nodeCreated(event.getPath());
496        }
497        break;
498      }
499
500      case NodeDeleted: {
501        for(ZKListener listener : listeners) {
502          listener.nodeDeleted(event.getPath());
503        }
504        break;
505      }
506
507      case NodeDataChanged: {
508        for(ZKListener listener : listeners) {
509          listener.nodeDataChanged(event.getPath());
510        }
511        break;
512      }
513
514      case NodeChildrenChanged: {
515        for(ZKListener listener : listeners) {
516          listener.nodeChildrenChanged(event.getPath());
517        }
518        break;
519      }
520      default:
521        throw new IllegalStateException("Received event is not valid: " + event.getState());
522    }
523  }
524
525  // Connection management
526
527  /**
528   * Called when there is a connection-related event via the Watcher callback.
529   * <p>
530   * If Disconnected or Expired, this should shutdown the cluster. But, since
531   * we send a KeeperException.SessionExpiredException along with the abort
532   * call, it's possible for the Abortable to catch it and try to create a new
533   * session with ZooKeeper. This is what the client does in HCM.
534   * <p>
535   * @param event the connection-related event
536   */
537  private void connectionEvent(WatchedEvent event) {
538    switch(event.getState()) {
539      case SyncConnected:
540        this.identifier = this.prefix + "-0x" +
541          Long.toHexString(this.recoverableZooKeeper.getSessionId());
542        // Update our identifier.  Otherwise ignore.
543        LOG.debug("{} connected", this.identifier);
544        break;
545
546      // Abort the server if Disconnected or Expired
547      case Disconnected:
548        LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
549        break;
550
551      case Expired:
552        String msg = prefix(this.identifier + " received expired from " +
553          "ZooKeeper, aborting");
554        // TODO: One thought is to add call to ZKListener so say,
555        // ZKNodeTracker can zero out its data values.
556        if (this.abortable != null) {
557          this.abortable.abort(msg, new KeeperException.SessionExpiredException());
558        }
559        break;
560
561      case ConnectedReadOnly:
562      case SaslAuthenticated:
563      case AuthFailed:
564        break;
565
566      default:
567        throw new IllegalStateException("Received event is not valid: " + event.getState());
568    }
569  }
570
571  /**
572   * Forces a synchronization of this ZooKeeper client connection.
573   * <p>
574   * Executing this method before running other methods will ensure that the
575   * subsequent operations are up-to-date and consistent as of the time that
576   * the sync is complete.
577   * <p>
578   * This is used for compareAndSwap type operations where we need to read the
579   * data of an existing node and delete or transition that node, utilizing the
580   * previously read version and data.  We want to ensure that the version read
581   * is up-to-date from when we begin the operation.
582   */
583  public void sync(String path) throws KeeperException {
584    this.recoverableZooKeeper.sync(path, null, null);
585  }
586
587  /**
588   * Handles KeeperExceptions in client calls.
589   * <p>
590   * This may be temporary but for now this gives one place to deal with these.
591   * <p>
592   * TODO: Currently this method rethrows the exception to let the caller handle
593   * <p>
594   * @param ke the exception to rethrow
595   * @throws KeeperException if a ZooKeeper operation fails
596   */
597  public void keeperException(KeeperException ke) throws KeeperException {
598    LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
599    throw ke;
600  }
601
602  /**
603   * Handles InterruptedExceptions in client calls.
604   * @param ie the InterruptedException instance thrown
605   * @throws KeeperException the exception to throw, transformed from the InterruptedException
606   */
607  public void interruptedException(InterruptedException ie) throws KeeperException {
608    interruptedExceptionNoThrow(ie, true);
609    // Throw a system error exception to let upper level handle it
610    KeeperException keeperException = new KeeperException.SystemErrorException();
611    keeperException.initCause(ie);
612    throw keeperException;
613  }
614
615  /**
616   * Log the InterruptedException and interrupt current thread
617   * @param ie The IterruptedException to log
618   * @param throwLater Whether we will throw the exception latter
619   */
620  public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) {
621    LOG.debug(prefix("Received InterruptedException, will interrupt current thread"
622        + (throwLater ? " and rethrow a SystemErrorException" : "")),
623      ie);
624    // At least preserve interrupt.
625    Thread.currentThread().interrupt();
626  }
627
628  /**
629   * Close the connection to ZooKeeper.
630   *
631   */
632  @Override
633  public void close() {
634    try {
635      recoverableZooKeeper.close();
636    } catch (InterruptedException e) {
637      Thread.currentThread().interrupt();
638    }
639  }
640
641  public Configuration getConfiguration() {
642    return conf;
643  }
644
645  @Override
646  public void abort(String why, Throwable e) {
647    if (this.abortable != null) {
648      this.abortable.abort(why, e);
649    } else {
650      this.aborted = true;
651    }
652  }
653
654  @Override
655  public boolean isAborted() {
656    return this.abortable == null? this.aborted: this.abortable.isAborted();
657  }
658}