001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.zookeeper; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.CopyOnWriteArrayList; 026import java.util.concurrent.CountDownLatch; 027import java.util.regex.Matcher; 028import java.util.regex.Pattern; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Abortable; 032import org.apache.hadoop.hbase.AuthUtil; 033import org.apache.hadoop.hbase.ZooKeeperConnectionException; 034import org.apache.hadoop.hbase.security.Superusers; 035import org.apache.hadoop.security.UserGroupInformation; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.zookeeper.KeeperException; 038import org.apache.zookeeper.WatchedEvent; 039import org.apache.zookeeper.Watcher; 040import org.apache.zookeeper.ZooDefs.Ids; 041import org.apache.zookeeper.ZooDefs.Perms; 042import org.apache.zookeeper.data.ACL; 043import org.apache.zookeeper.data.Id; 044import org.apache.zookeeper.data.Stat; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * Acts as the single ZooKeeper Watcher. One instance of this is instantiated 050 * for each Master, RegionServer, and client process. 051 * 052 * <p>This is the only class that implements {@link Watcher}. Other internal 053 * classes which need to be notified of ZooKeeper events must register with 054 * the local instance of this watcher via {@link #registerListener}. 055 * 056 * <p>This class also holds and manages the connection to ZooKeeper. Code to 057 * deal with connection related events and exceptions are handled here. 058 */ 059@InterfaceAudience.Private 060public class ZKWatcher implements Watcher, Abortable, Closeable { 061 private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class); 062 063 // Identifier for this watcher (for logging only). It is made of the prefix 064 // passed on construction and the zookeeper sessionid. 065 private String prefix; 066 private String identifier; 067 068 // zookeeper quorum 069 private String quorum; 070 071 // zookeeper connection 072 private final RecoverableZooKeeper recoverableZooKeeper; 073 074 // abortable in case of zk failure 075 protected Abortable abortable; 076 // Used if abortable is null 077 private boolean aborted = false; 078 079 private final ZNodePaths znodePaths; 080 081 // listeners to be notified 082 private final List<ZKListener> listeners = new CopyOnWriteArrayList<>(); 083 084 // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL 085 // negotiation to complete 086 private CountDownLatch saslLatch = new CountDownLatch(1); 087 088 private final Configuration conf; 089 090 /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ 091 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); 092 093 /** 094 * Instantiate a ZooKeeper connection and watcher. 095 * @param identifier string that is passed to RecoverableZookeeper to be used as 096 * identifier for this instance. Use null for default. 097 * @throws IOException if the connection to ZooKeeper fails 098 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 099 */ 100 public ZKWatcher(Configuration conf, String identifier, 101 Abortable abortable) throws ZooKeeperConnectionException, IOException { 102 this(conf, identifier, abortable, false); 103 } 104 105 /** 106 * Instantiate a ZooKeeper connection and watcher. 107 * @param conf the configuration to use 108 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 109 * this instance. Use null for default. 110 * @param abortable Can be null if there is on error there is no host to abort: e.g. client 111 * context. 112 * @param canCreateBaseZNode true if a base ZNode can be created 113 * @throws IOException if the connection to ZooKeeper fails 114 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 115 */ 116 public ZKWatcher(Configuration conf, String identifier, 117 Abortable abortable, boolean canCreateBaseZNode) 118 throws IOException, ZooKeeperConnectionException { 119 this(conf, identifier, abortable, canCreateBaseZNode, false); 120 } 121 122 /** 123 * Instantiate a ZooKeeper connection and watcher. 124 * @param conf the configuration to use 125 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 126 * this instance. Use null for default. 127 * @param abortable Can be null if there is on error there is no host to abort: e.g. client 128 * context. 129 * @param canCreateBaseZNode true if a base ZNode can be created 130 * @param clientZK whether this watcher is set to access client ZK 131 * @throws IOException if the connection to ZooKeeper fails 132 * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base 133 * ZNodes 134 */ 135 public ZKWatcher(Configuration conf, String identifier, Abortable abortable, 136 boolean canCreateBaseZNode, boolean clientZK) 137 throws IOException, ZooKeeperConnectionException { 138 this.conf = conf; 139 if (clientZK) { 140 String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); 141 String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf); 142 if (clientZkQuorumServers != null) { 143 if (clientZkQuorumServers.equals(serverZkQuorumServers)) { 144 // Don't allow same settings to avoid dead loop when master trying 145 // to sync meta information from server ZK to client ZK 146 throw new IllegalArgumentException( 147 "The quorum settings for client ZK should be different from those for server"); 148 } 149 this.quorum = clientZkQuorumServers; 150 } else { 151 this.quorum = serverZkQuorumServers; 152 } 153 } else { 154 this.quorum = ZKConfig.getZKQuorumServersString(conf); 155 } 156 this.prefix = identifier; 157 // Identifier will get the sessionid appended later below down when we 158 // handle the syncconnect event. 159 this.identifier = identifier + "0x0"; 160 this.abortable = abortable; 161 this.znodePaths = new ZNodePaths(conf); 162 PendingWatcher pendingWatcher = new PendingWatcher(); 163 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier); 164 pendingWatcher.prepare(this); 165 if (canCreateBaseZNode) { 166 try { 167 createBaseZNodes(); 168 } catch (ZooKeeperConnectionException zce) { 169 try { 170 this.recoverableZooKeeper.close(); 171 } catch (InterruptedException ie) { 172 LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper); 173 Thread.currentThread().interrupt(); 174 } 175 throw zce; 176 } 177 } 178 } 179 180 private void createBaseZNodes() throws ZooKeeperConnectionException { 181 try { 182 // Create all the necessary "directories" of znodes 183 ZKUtil.createWithParents(this, znodePaths.baseZNode); 184 ZKUtil.createAndFailSilent(this, znodePaths.rsZNode); 185 ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode); 186 ZKUtil.createAndFailSilent(this, znodePaths.tableZNode); 187 ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode); 188 ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode); 189 ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode); 190 } catch (KeeperException e) { 191 throw new ZooKeeperConnectionException( 192 prefix("Unexpected KeeperException creating base node"), e); 193 } 194 } 195 196 /** 197 * On master start, we check the znode ACLs under the root directory and set the ACLs properly 198 * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed 199 * so that the existing znodes created with open permissions are now changed with restrictive 200 * perms. 201 */ 202 public void checkAndSetZNodeAcls() { 203 if (!ZKUtil.isSecureZooKeeper(getConfiguration())) { 204 LOG.info("not a secure deployment, proceeding"); 205 return; 206 } 207 208 // Check the base znodes permission first. Only do the recursion if base znode's perms are not 209 // correct. 210 try { 211 List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat()); 212 213 if (!isBaseZnodeAclSetup(actualAcls)) { 214 LOG.info("setting znode ACLs"); 215 setZnodeAclsRecursive(znodePaths.baseZNode); 216 } 217 } catch(KeeperException.NoNodeException nne) { 218 return; 219 } catch(InterruptedException ie) { 220 interruptedExceptionNoThrow(ie, false); 221 } catch (IOException|KeeperException e) { 222 LOG.warn("Received exception while checking and setting zookeeper ACLs", e); 223 } 224 } 225 226 /** 227 * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs 228 * will be set last in case the master fails in between. 229 * @param znode the ZNode to set the permissions for 230 */ 231 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException { 232 List<String> children = recoverableZooKeeper.getChildren(znode, false); 233 234 for (String child : children) { 235 setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child)); 236 } 237 List<ACL> acls = ZKUtil.createACL(this, znode, true); 238 LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls); 239 recoverableZooKeeper.setAcl(znode, acls, -1); 240 } 241 242 /** 243 * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup. 244 * @param acls acls from zookeeper 245 * @return whether ACLs are set for the base znode 246 * @throws IOException if getting the current user fails 247 */ 248 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException { 249 if (LOG.isDebugEnabled()) { 250 LOG.debug("Checking znode ACLs"); 251 } 252 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY); 253 // Check whether ACL set for all superusers 254 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) { 255 return false; 256 } 257 258 // this assumes that current authenticated user is the same as zookeeper client user 259 // configured via JAAS 260 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 261 262 if (acls.isEmpty()) { 263 if (LOG.isDebugEnabled()) { 264 LOG.debug("ACL is empty"); 265 } 266 return false; 267 } 268 269 for (ACL acl : acls) { 270 int perms = acl.getPerms(); 271 Id id = acl.getId(); 272 // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser 273 // and one for the hbase user 274 if (Ids.ANYONE_ID_UNSAFE.equals(id)) { 275 if (perms != Perms.READ) { 276 if (LOG.isDebugEnabled()) { 277 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 278 id, perms, Perms.READ)); 279 } 280 return false; 281 } 282 } else if (superUsers != null && isSuperUserId(superUsers, id)) { 283 if (perms != Perms.ALL) { 284 if (LOG.isDebugEnabled()) { 285 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 286 id, perms, Perms.ALL)); 287 } 288 return false; 289 } 290 } else if ("sasl".equals(id.getScheme())) { 291 String name = id.getId(); 292 // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname 293 Matcher match = NAME_PATTERN.matcher(name); 294 if (match.matches()) { 295 name = match.group(1); 296 } 297 if (name.equals(hbaseUser)) { 298 if (perms != Perms.ALL) { 299 if (LOG.isDebugEnabled()) { 300 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 301 id, perms, Perms.ALL)); 302 } 303 return false; 304 } 305 } else { 306 if (LOG.isDebugEnabled()) { 307 LOG.debug("Unexpected shortname in SASL ACL: {}", id); 308 } 309 return false; 310 } 311 } else { 312 if (LOG.isDebugEnabled()) { 313 LOG.debug("unexpected ACL id '{}'", id); 314 } 315 return false; 316 } 317 } 318 return true; 319 } 320 321 /* 322 * Validate whether ACL set for all superusers. 323 */ 324 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) { 325 for (String user : superUsers) { 326 boolean hasAccess = false; 327 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 328 if (!AuthUtil.isGroupPrincipal(user)) { 329 for (ACL acl : acls) { 330 if (user.equals(acl.getId().getId())) { 331 if (acl.getPerms() == Perms.ALL) { 332 hasAccess = true; 333 } else { 334 if (LOG.isDebugEnabled()) { 335 LOG.debug(String.format( 336 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x", 337 acl.getId().getId(), acl.getPerms(), Perms.ALL)); 338 } 339 } 340 break; 341 } 342 } 343 if (!hasAccess) { 344 return false; 345 } 346 } 347 } 348 return true; 349 } 350 351 /* 352 * Validate whether ACL ID is superuser. 353 */ 354 public static boolean isSuperUserId(String[] superUsers, Id id) { 355 for (String user : superUsers) { 356 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 357 if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) { 358 return true; 359 } 360 } 361 return false; 362 } 363 364 @Override 365 public String toString() { 366 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode; 367 } 368 369 /** 370 * Adds this instance's identifier as a prefix to the passed <code>str</code> 371 * @param str String to amend. 372 * @return A new string with this instance's identifier as prefix: e.g. 373 * if passed 'hello world', the returned string could be 374 */ 375 public String prefix(final String str) { 376 return this.toString() + " " + str; 377 } 378 379 /** 380 * Get the znodes corresponding to the meta replicas from ZK 381 * @return list of znodes 382 * @throws KeeperException if a ZooKeeper operation fails 383 */ 384 public List<String> getMetaReplicaNodes() throws KeeperException { 385 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode); 386 List<String> metaReplicaNodes = new ArrayList<>(2); 387 if (childrenOfBaseNode != null) { 388 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server"); 389 for (String child : childrenOfBaseNode) { 390 if (child.startsWith(pattern)) { 391 metaReplicaNodes.add(child); 392 } 393 } 394 } 395 return metaReplicaNodes; 396 } 397 398 /** 399 * Register the specified listener to receive ZooKeeper events. 400 * @param listener the listener to register 401 */ 402 public void registerListener(ZKListener listener) { 403 listeners.add(listener); 404 } 405 406 /** 407 * Register the specified listener to receive ZooKeeper events and add it as 408 * the first in the list of current listeners. 409 * @param listener the listener to register 410 */ 411 public void registerListenerFirst(ZKListener listener) { 412 listeners.add(0, listener); 413 } 414 415 public void unregisterListener(ZKListener listener) { 416 listeners.remove(listener); 417 } 418 419 /** 420 * Clean all existing listeners 421 */ 422 public void unregisterAllListeners() { 423 listeners.clear(); 424 } 425 426 /** 427 * Get a copy of current registered listeners 428 */ 429 public List<ZKListener> getListeners() { 430 return new ArrayList<>(listeners); 431 } 432 433 /** 434 * @return The number of currently registered listeners 435 */ 436 public int getNumberOfListeners() { 437 return listeners.size(); 438 } 439 440 /** 441 * Get the connection to ZooKeeper. 442 * @return connection reference to zookeeper 443 */ 444 public RecoverableZooKeeper getRecoverableZooKeeper() { 445 return recoverableZooKeeper; 446 } 447 448 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException { 449 recoverableZooKeeper.reconnectAfterExpiration(); 450 } 451 452 /** 453 * Get the quorum address of this instance. 454 * @return quorum string of this zookeeper connection instance 455 */ 456 public String getQuorum() { 457 return quorum; 458 } 459 460 /** 461 * Get the znodePaths. 462 * <p> 463 * Mainly used for mocking as mockito can not mock a field access. 464 */ 465 public ZNodePaths getZNodePaths() { 466 return znodePaths; 467 } 468 469 /** 470 * Method called from ZooKeeper for events and connection status. 471 * <p> 472 * Valid events are passed along to listeners. Connection status changes 473 * are dealt with locally. 474 */ 475 @Override 476 public void process(WatchedEvent event) { 477 LOG.debug(prefix("Received ZooKeeper Event, " + 478 "type=" + event.getType() + ", " + 479 "state=" + event.getState() + ", " + 480 "path=" + event.getPath())); 481 482 switch(event.getType()) { 483 484 // If event type is NONE, this is a connection status change 485 case None: { 486 connectionEvent(event); 487 break; 488 } 489 490 // Otherwise pass along to the listeners 491 492 case NodeCreated: { 493 for(ZKListener listener : listeners) { 494 listener.nodeCreated(event.getPath()); 495 } 496 break; 497 } 498 499 case NodeDeleted: { 500 for(ZKListener listener : listeners) { 501 listener.nodeDeleted(event.getPath()); 502 } 503 break; 504 } 505 506 case NodeDataChanged: { 507 for(ZKListener listener : listeners) { 508 listener.nodeDataChanged(event.getPath()); 509 } 510 break; 511 } 512 513 case NodeChildrenChanged: { 514 for(ZKListener listener : listeners) { 515 listener.nodeChildrenChanged(event.getPath()); 516 } 517 break; 518 } 519 default: 520 throw new IllegalStateException("Received event is not valid: " + event.getState()); 521 } 522 } 523 524 // Connection management 525 526 /** 527 * Called when there is a connection-related event via the Watcher callback. 528 * <p> 529 * If Disconnected or Expired, this should shutdown the cluster. But, since 530 * we send a KeeperException.SessionExpiredException along with the abort 531 * call, it's possible for the Abortable to catch it and try to create a new 532 * session with ZooKeeper. This is what the client does in HCM. 533 * <p> 534 * @param event the connection-related event 535 */ 536 private void connectionEvent(WatchedEvent event) { 537 switch(event.getState()) { 538 case SyncConnected: 539 this.identifier = this.prefix + "-0x" + 540 Long.toHexString(this.recoverableZooKeeper.getSessionId()); 541 // Update our identifier. Otherwise ignore. 542 LOG.debug("{} connected", this.identifier); 543 break; 544 545 // Abort the server if Disconnected or Expired 546 case Disconnected: 547 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring")); 548 break; 549 550 case Expired: 551 String msg = prefix(this.identifier + " received expired from " + 552 "ZooKeeper, aborting"); 553 // TODO: One thought is to add call to ZKListener so say, 554 // ZKNodeTracker can zero out its data values. 555 if (this.abortable != null) { 556 this.abortable.abort(msg, new KeeperException.SessionExpiredException()); 557 } 558 break; 559 560 case ConnectedReadOnly: 561 case SaslAuthenticated: 562 case AuthFailed: 563 break; 564 565 default: 566 throw new IllegalStateException("Received event is not valid: " + event.getState()); 567 } 568 } 569 570 /** 571 * Forces a synchronization of this ZooKeeper client connection. 572 * <p> 573 * Executing this method before running other methods will ensure that the 574 * subsequent operations are up-to-date and consistent as of the time that 575 * the sync is complete. 576 * <p> 577 * This is used for compareAndSwap type operations where we need to read the 578 * data of an existing node and delete or transition that node, utilizing the 579 * previously read version and data. We want to ensure that the version read 580 * is up-to-date from when we begin the operation. 581 */ 582 public void sync(String path) throws KeeperException { 583 this.recoverableZooKeeper.sync(path, null, null); 584 } 585 586 /** 587 * Handles KeeperExceptions in client calls. 588 * <p> 589 * This may be temporary but for now this gives one place to deal with these. 590 * <p> 591 * TODO: Currently this method rethrows the exception to let the caller handle 592 * <p> 593 * @param ke the exception to rethrow 594 * @throws KeeperException if a ZooKeeper operation fails 595 */ 596 public void keeperException(KeeperException ke) throws KeeperException { 597 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke); 598 throw ke; 599 } 600 601 /** 602 * Handles InterruptedExceptions in client calls. 603 * @param ie the InterruptedException instance thrown 604 * @throws KeeperException the exception to throw, transformed from the InterruptedException 605 */ 606 public void interruptedException(InterruptedException ie) throws KeeperException { 607 interruptedExceptionNoThrow(ie, true); 608 // Throw a system error exception to let upper level handle it 609 KeeperException keeperException = new KeeperException.SystemErrorException(); 610 keeperException.initCause(ie); 611 throw keeperException; 612 } 613 614 /** 615 * Log the InterruptedException and interrupt current thread 616 * @param ie The IterruptedException to log 617 * @param throwLater Whether we will throw the exception latter 618 */ 619 public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) { 620 LOG.debug(prefix("Received InterruptedException, will interrupt current thread" 621 + (throwLater ? " and rethrow a SystemErrorException" : "")), 622 ie); 623 // At least preserve interrupt. 624 Thread.currentThread().interrupt(); 625 } 626 627 /** 628 * Close the connection to ZooKeeper. 629 * 630 */ 631 @Override 632 public void close() { 633 try { 634 recoverableZooKeeper.close(); 635 } catch (InterruptedException e) { 636 Thread.currentThread().interrupt(); 637 } 638 } 639 640 public Configuration getConfiguration() { 641 return conf; 642 } 643 644 @Override 645 public void abort(String why, Throwable e) { 646 if (this.abortable != null) { 647 this.abortable.abort(why, e); 648 } else { 649 this.aborted = true; 650 } 651 } 652 653 @Override 654 public boolean isAborted() { 655 return this.abortable == null? this.aborted: this.abortable.isAborted(); 656 } 657}