001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.zookeeper; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.CopyOnWriteArrayList; 026import java.util.concurrent.CountDownLatch; 027import java.util.regex.Matcher; 028import java.util.regex.Pattern; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Abortable; 032import org.apache.hadoop.hbase.AuthUtil; 033import org.apache.hadoop.hbase.ZooKeeperConnectionException; 034import org.apache.hadoop.hbase.security.Superusers; 035import org.apache.hadoop.security.UserGroupInformation; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.zookeeper.KeeperException; 038import org.apache.zookeeper.WatchedEvent; 039import org.apache.zookeeper.Watcher; 040import org.apache.zookeeper.ZooDefs.Ids; 041import org.apache.zookeeper.ZooDefs.Perms; 042import org.apache.zookeeper.data.ACL; 043import org.apache.zookeeper.data.Id; 044import org.apache.zookeeper.data.Stat; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * Acts as the single ZooKeeper Watcher. One instance of this is instantiated 050 * for each Master, RegionServer, and client process. 051 * 052 * <p>This is the only class that implements {@link Watcher}. Other internal 053 * classes which need to be notified of ZooKeeper events must register with 054 * the local instance of this watcher via {@link #registerListener}. 055 * 056 * <p>This class also holds and manages the connection to ZooKeeper. Code to 057 * deal with connection related events and exceptions are handled here. 058 */ 059@InterfaceAudience.Private 060public class ZKWatcher implements Watcher, Abortable, Closeable { 061 private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class); 062 063 // Identifier for this watcher (for logging only). It is made of the prefix 064 // passed on construction and the zookeeper sessionid. 065 private String prefix; 066 private String identifier; 067 068 // zookeeper quorum 069 private String quorum; 070 071 // zookeeper connection 072 private final RecoverableZooKeeper recoverableZooKeeper; 073 074 // abortable in case of zk failure 075 protected Abortable abortable; 076 // Used if abortable is null 077 private boolean aborted = false; 078 079 private final ZNodePaths znodePaths; 080 081 // listeners to be notified 082 private final List<ZKListener> listeners = new CopyOnWriteArrayList<>(); 083 084 // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL 085 // negotiation to complete 086 private CountDownLatch saslLatch = new CountDownLatch(1); 087 088 private final Configuration conf; 089 090 /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ 091 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); 092 093 /** 094 * Instantiate a ZooKeeper connection and watcher. 095 * @param identifier string that is passed to RecoverableZookeeper to be used as 096 * identifier for this instance. Use null for default. 097 * @throws IOException if the connection to ZooKeeper fails 098 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 099 */ 100 public ZKWatcher(Configuration conf, String identifier, 101 Abortable abortable) throws ZooKeeperConnectionException, IOException { 102 this(conf, identifier, abortable, false); 103 } 104 105 /** 106 * Instantiate a ZooKeeper connection and watcher. 107 * @param conf the configuration to use 108 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 109 * this instance. Use null for default. 110 * @param abortable Can be null if there is on error there is no host to abort: e.g. client 111 * context. 112 * @param canCreateBaseZNode true if a base ZNode can be created 113 * @throws IOException if the connection to ZooKeeper fails 114 * @throws ZooKeeperConnectionException if the client can't connect to ZooKeeper 115 */ 116 public ZKWatcher(Configuration conf, String identifier, 117 Abortable abortable, boolean canCreateBaseZNode) 118 throws IOException, ZooKeeperConnectionException { 119 this(conf, identifier, abortable, canCreateBaseZNode, false); 120 } 121 122 /** 123 * Instantiate a ZooKeeper connection and watcher. 124 * @param conf the configuration to use 125 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 126 * this instance. Use null for default. 127 * @param abortable Can be null if there is on error there is no host to abort: e.g. client 128 * context. 129 * @param canCreateBaseZNode true if a base ZNode can be created 130 * @param clientZK whether this watcher is set to access client ZK 131 * @throws IOException if the connection to ZooKeeper fails 132 * @throws ZooKeeperConnectionException if the connection to Zookeeper fails when create base 133 * ZNodes 134 */ 135 public ZKWatcher(Configuration conf, String identifier, Abortable abortable, 136 boolean canCreateBaseZNode, boolean clientZK) 137 throws IOException, ZooKeeperConnectionException { 138 this.conf = conf; 139 if (clientZK) { 140 String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); 141 String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf); 142 if (clientZkQuorumServers != null) { 143 if (clientZkQuorumServers.equals(serverZkQuorumServers)) { 144 // Don't allow same settings to avoid dead loop when master trying 145 // to sync meta information from server ZK to client ZK 146 throw new IllegalArgumentException( 147 "The quorum settings for client ZK should be different from those for server"); 148 } 149 this.quorum = clientZkQuorumServers; 150 } else { 151 this.quorum = serverZkQuorumServers; 152 } 153 } else { 154 this.quorum = ZKConfig.getZKQuorumServersString(conf); 155 } 156 this.prefix = identifier; 157 // Identifier will get the sessionid appended later below down when we 158 // handle the syncconnect event. 159 this.identifier = identifier + "0x0"; 160 this.abortable = abortable; 161 this.znodePaths = new ZNodePaths(conf); 162 PendingWatcher pendingWatcher = new PendingWatcher(); 163 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier); 164 pendingWatcher.prepare(this); 165 if (canCreateBaseZNode) { 166 try { 167 createBaseZNodes(); 168 } catch (ZooKeeperConnectionException zce) { 169 try { 170 this.recoverableZooKeeper.close(); 171 } catch (InterruptedException ie) { 172 LOG.debug("Encountered InterruptedException when closing {}", this.recoverableZooKeeper); 173 Thread.currentThread().interrupt(); 174 } 175 throw zce; 176 } 177 } 178 } 179 180 private void createBaseZNodes() throws ZooKeeperConnectionException { 181 try { 182 // Create all the necessary "directories" of znodes 183 ZKUtil.createWithParents(this, znodePaths.baseZNode); 184 ZKUtil.createAndFailSilent(this, znodePaths.rsZNode); 185 ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode); 186 ZKUtil.createAndFailSilent(this, znodePaths.tableZNode); 187 ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode); 188 ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode); 189 ZKUtil.createAndFailSilent(this, znodePaths.tableLockZNode); 190 ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode); 191 } catch (KeeperException e) { 192 throw new ZooKeeperConnectionException( 193 prefix("Unexpected KeeperException creating base node"), e); 194 } 195 } 196 197 /** 198 * On master start, we check the znode ACLs under the root directory and set the ACLs properly 199 * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed 200 * so that the existing znodes created with open permissions are now changed with restrictive 201 * perms. 202 */ 203 public void checkAndSetZNodeAcls() { 204 if (!ZKUtil.isSecureZooKeeper(getConfiguration())) { 205 LOG.info("not a secure deployment, proceeding"); 206 return; 207 } 208 209 // Check the base znodes permission first. Only do the recursion if base znode's perms are not 210 // correct. 211 try { 212 List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat()); 213 214 if (!isBaseZnodeAclSetup(actualAcls)) { 215 LOG.info("setting znode ACLs"); 216 setZnodeAclsRecursive(znodePaths.baseZNode); 217 } 218 } catch(KeeperException.NoNodeException nne) { 219 return; 220 } catch(InterruptedException ie) { 221 interruptedExceptionNoThrow(ie, false); 222 } catch (IOException|KeeperException e) { 223 LOG.warn("Received exception while checking and setting zookeeper ACLs", e); 224 } 225 } 226 227 /** 228 * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs 229 * will be set last in case the master fails in between. 230 * @param znode the ZNode to set the permissions for 231 */ 232 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException { 233 List<String> children = recoverableZooKeeper.getChildren(znode, false); 234 235 for (String child : children) { 236 setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child)); 237 } 238 List<ACL> acls = ZKUtil.createACL(this, znode, true); 239 LOG.info("Setting ACLs for znode:{} , acl:{}", znode, acls); 240 recoverableZooKeeper.setAcl(znode, acls, -1); 241 } 242 243 /** 244 * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup. 245 * @param acls acls from zookeeper 246 * @return whether ACLs are set for the base znode 247 * @throws IOException if getting the current user fails 248 */ 249 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException { 250 if (LOG.isDebugEnabled()) { 251 LOG.debug("Checking znode ACLs"); 252 } 253 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY); 254 // Check whether ACL set for all superusers 255 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) { 256 return false; 257 } 258 259 // this assumes that current authenticated user is the same as zookeeper client user 260 // configured via JAAS 261 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 262 263 if (acls.isEmpty()) { 264 if (LOG.isDebugEnabled()) { 265 LOG.debug("ACL is empty"); 266 } 267 return false; 268 } 269 270 for (ACL acl : acls) { 271 int perms = acl.getPerms(); 272 Id id = acl.getId(); 273 // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser 274 // and one for the hbase user 275 if (Ids.ANYONE_ID_UNSAFE.equals(id)) { 276 if (perms != Perms.READ) { 277 if (LOG.isDebugEnabled()) { 278 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 279 id, perms, Perms.READ)); 280 } 281 return false; 282 } 283 } else if (superUsers != null && isSuperUserId(superUsers, id)) { 284 if (perms != Perms.ALL) { 285 if (LOG.isDebugEnabled()) { 286 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 287 id, perms, Perms.ALL)); 288 } 289 return false; 290 } 291 } else if ("sasl".equals(id.getScheme())) { 292 String name = id.getId(); 293 // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname 294 Matcher match = NAME_PATTERN.matcher(name); 295 if (match.matches()) { 296 name = match.group(1); 297 } 298 if (name.equals(hbaseUser)) { 299 if (perms != Perms.ALL) { 300 if (LOG.isDebugEnabled()) { 301 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 302 id, perms, Perms.ALL)); 303 } 304 return false; 305 } 306 } else { 307 if (LOG.isDebugEnabled()) { 308 LOG.debug("Unexpected shortname in SASL ACL: {}", id); 309 } 310 return false; 311 } 312 } else { 313 if (LOG.isDebugEnabled()) { 314 LOG.debug("unexpected ACL id '{}'", id); 315 } 316 return false; 317 } 318 } 319 return true; 320 } 321 322 /* 323 * Validate whether ACL set for all superusers. 324 */ 325 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) { 326 for (String user : superUsers) { 327 boolean hasAccess = false; 328 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 329 if (!AuthUtil.isGroupPrincipal(user)) { 330 for (ACL acl : acls) { 331 if (user.equals(acl.getId().getId())) { 332 if (acl.getPerms() == Perms.ALL) { 333 hasAccess = true; 334 } else { 335 if (LOG.isDebugEnabled()) { 336 LOG.debug(String.format( 337 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x", 338 acl.getId().getId(), acl.getPerms(), Perms.ALL)); 339 } 340 } 341 break; 342 } 343 } 344 if (!hasAccess) { 345 return false; 346 } 347 } 348 } 349 return true; 350 } 351 352 /* 353 * Validate whether ACL ID is superuser. 354 */ 355 public static boolean isSuperUserId(String[] superUsers, Id id) { 356 for (String user : superUsers) { 357 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 358 if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) { 359 return true; 360 } 361 } 362 return false; 363 } 364 365 @Override 366 public String toString() { 367 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode; 368 } 369 370 /** 371 * Adds this instance's identifier as a prefix to the passed <code>str</code> 372 * @param str String to amend. 373 * @return A new string with this instance's identifier as prefix: e.g. 374 * if passed 'hello world', the returned string could be 375 */ 376 public String prefix(final String str) { 377 return this.toString() + " " + str; 378 } 379 380 /** 381 * Get the znodes corresponding to the meta replicas from ZK 382 * @return list of znodes 383 * @throws KeeperException if a ZooKeeper operation fails 384 */ 385 public List<String> getMetaReplicaNodes() throws KeeperException { 386 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode); 387 List<String> metaReplicaNodes = new ArrayList<>(2); 388 if (childrenOfBaseNode != null) { 389 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server"); 390 for (String child : childrenOfBaseNode) { 391 if (child.startsWith(pattern)) { 392 metaReplicaNodes.add(child); 393 } 394 } 395 } 396 return metaReplicaNodes; 397 } 398 399 /** 400 * Register the specified listener to receive ZooKeeper events. 401 * @param listener the listener to register 402 */ 403 public void registerListener(ZKListener listener) { 404 listeners.add(listener); 405 } 406 407 /** 408 * Register the specified listener to receive ZooKeeper events and add it as 409 * the first in the list of current listeners. 410 * @param listener the listener to register 411 */ 412 public void registerListenerFirst(ZKListener listener) { 413 listeners.add(0, listener); 414 } 415 416 public void unregisterListener(ZKListener listener) { 417 listeners.remove(listener); 418 } 419 420 /** 421 * Clean all existing listeners 422 */ 423 public void unregisterAllListeners() { 424 listeners.clear(); 425 } 426 427 /** 428 * Get a copy of current registered listeners 429 */ 430 public List<ZKListener> getListeners() { 431 return new ArrayList<>(listeners); 432 } 433 434 /** 435 * @return The number of currently registered listeners 436 */ 437 public int getNumberOfListeners() { 438 return listeners.size(); 439 } 440 441 /** 442 * Get the connection to ZooKeeper. 443 * @return connection reference to zookeeper 444 */ 445 public RecoverableZooKeeper getRecoverableZooKeeper() { 446 return recoverableZooKeeper; 447 } 448 449 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException { 450 recoverableZooKeeper.reconnectAfterExpiration(); 451 } 452 453 /** 454 * Get the quorum address of this instance. 455 * @return quorum string of this zookeeper connection instance 456 */ 457 public String getQuorum() { 458 return quorum; 459 } 460 461 /** 462 * Get the znodePaths. 463 * <p> 464 * Mainly used for mocking as mockito can not mock a field access. 465 */ 466 public ZNodePaths getZNodePaths() { 467 return znodePaths; 468 } 469 470 /** 471 * Method called from ZooKeeper for events and connection status. 472 * <p> 473 * Valid events are passed along to listeners. Connection status changes 474 * are dealt with locally. 475 */ 476 @Override 477 public void process(WatchedEvent event) { 478 LOG.debug(prefix("Received ZooKeeper Event, " + 479 "type=" + event.getType() + ", " + 480 "state=" + event.getState() + ", " + 481 "path=" + event.getPath())); 482 483 switch(event.getType()) { 484 485 // If event type is NONE, this is a connection status change 486 case None: { 487 connectionEvent(event); 488 break; 489 } 490 491 // Otherwise pass along to the listeners 492 493 case NodeCreated: { 494 for(ZKListener listener : listeners) { 495 listener.nodeCreated(event.getPath()); 496 } 497 break; 498 } 499 500 case NodeDeleted: { 501 for(ZKListener listener : listeners) { 502 listener.nodeDeleted(event.getPath()); 503 } 504 break; 505 } 506 507 case NodeDataChanged: { 508 for(ZKListener listener : listeners) { 509 listener.nodeDataChanged(event.getPath()); 510 } 511 break; 512 } 513 514 case NodeChildrenChanged: { 515 for(ZKListener listener : listeners) { 516 listener.nodeChildrenChanged(event.getPath()); 517 } 518 break; 519 } 520 default: 521 throw new IllegalStateException("Received event is not valid: " + event.getState()); 522 } 523 } 524 525 // Connection management 526 527 /** 528 * Called when there is a connection-related event via the Watcher callback. 529 * <p> 530 * If Disconnected or Expired, this should shutdown the cluster. But, since 531 * we send a KeeperException.SessionExpiredException along with the abort 532 * call, it's possible for the Abortable to catch it and try to create a new 533 * session with ZooKeeper. This is what the client does in HCM. 534 * <p> 535 * @param event the connection-related event 536 */ 537 private void connectionEvent(WatchedEvent event) { 538 switch(event.getState()) { 539 case SyncConnected: 540 this.identifier = this.prefix + "-0x" + 541 Long.toHexString(this.recoverableZooKeeper.getSessionId()); 542 // Update our identifier. Otherwise ignore. 543 LOG.debug("{} connected", this.identifier); 544 break; 545 546 // Abort the server if Disconnected or Expired 547 case Disconnected: 548 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring")); 549 break; 550 551 case Expired: 552 String msg = prefix(this.identifier + " received expired from " + 553 "ZooKeeper, aborting"); 554 // TODO: One thought is to add call to ZKListener so say, 555 // ZKNodeTracker can zero out its data values. 556 if (this.abortable != null) { 557 this.abortable.abort(msg, new KeeperException.SessionExpiredException()); 558 } 559 break; 560 561 case ConnectedReadOnly: 562 case SaslAuthenticated: 563 case AuthFailed: 564 break; 565 566 default: 567 throw new IllegalStateException("Received event is not valid: " + event.getState()); 568 } 569 } 570 571 /** 572 * Forces a synchronization of this ZooKeeper client connection. 573 * <p> 574 * Executing this method before running other methods will ensure that the 575 * subsequent operations are up-to-date and consistent as of the time that 576 * the sync is complete. 577 * <p> 578 * This is used for compareAndSwap type operations where we need to read the 579 * data of an existing node and delete or transition that node, utilizing the 580 * previously read version and data. We want to ensure that the version read 581 * is up-to-date from when we begin the operation. 582 */ 583 public void sync(String path) throws KeeperException { 584 this.recoverableZooKeeper.sync(path, null, null); 585 } 586 587 /** 588 * Handles KeeperExceptions in client calls. 589 * <p> 590 * This may be temporary but for now this gives one place to deal with these. 591 * <p> 592 * TODO: Currently this method rethrows the exception to let the caller handle 593 * <p> 594 * @param ke the exception to rethrow 595 * @throws KeeperException if a ZooKeeper operation fails 596 */ 597 public void keeperException(KeeperException ke) throws KeeperException { 598 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke); 599 throw ke; 600 } 601 602 /** 603 * Handles InterruptedExceptions in client calls. 604 * @param ie the InterruptedException instance thrown 605 * @throws KeeperException the exception to throw, transformed from the InterruptedException 606 */ 607 public void interruptedException(InterruptedException ie) throws KeeperException { 608 interruptedExceptionNoThrow(ie, true); 609 // Throw a system error exception to let upper level handle it 610 KeeperException keeperException = new KeeperException.SystemErrorException(); 611 keeperException.initCause(ie); 612 throw keeperException; 613 } 614 615 /** 616 * Log the InterruptedException and interrupt current thread 617 * @param ie The IterruptedException to log 618 * @param throwLater Whether we will throw the exception latter 619 */ 620 public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) { 621 LOG.debug(prefix("Received InterruptedException, will interrupt current thread" 622 + (throwLater ? " and rethrow a SystemErrorException" : "")), 623 ie); 624 // At least preserve interrupt. 625 Thread.currentThread().interrupt(); 626 } 627 628 /** 629 * Close the connection to ZooKeeper. 630 * 631 */ 632 @Override 633 public void close() { 634 try { 635 recoverableZooKeeper.close(); 636 } catch (InterruptedException e) { 637 Thread.currentThread().interrupt(); 638 } 639 } 640 641 public Configuration getConfiguration() { 642 return conf; 643 } 644 645 @Override 646 public void abort(String why, Throwable e) { 647 if (this.abortable != null) { 648 this.abortable.abort(why, e); 649 } else { 650 this.aborted = true; 651 } 652 } 653 654 @Override 655 public boolean isAborted() { 656 return this.abortable == null? this.aborted: this.abortable.isAborted(); 657 } 658}