001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.zookeeper; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.concurrent.CopyOnWriteArrayList; 026import java.util.concurrent.CountDownLatch; 027import java.util.regex.Matcher; 028import java.util.regex.Pattern; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Abortable; 032import org.apache.hadoop.hbase.AuthUtil; 033import org.apache.hadoop.hbase.ZooKeeperConnectionException; 034import org.apache.hadoop.hbase.security.Superusers; 035import org.apache.hadoop.security.UserGroupInformation; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.zookeeper.KeeperException; 038import org.apache.zookeeper.WatchedEvent; 039import org.apache.zookeeper.Watcher; 040import org.apache.zookeeper.ZooDefs.Ids; 041import org.apache.zookeeper.ZooDefs.Perms; 042import org.apache.zookeeper.data.ACL; 043import org.apache.zookeeper.data.Id; 044import org.apache.zookeeper.data.Stat; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * Acts as the single ZooKeeper Watcher. One instance of this is instantiated 050 * for each Master, RegionServer, and client process. 051 * 052 * <p>This is the only class that implements {@link Watcher}. Other internal 053 * classes which need to be notified of ZooKeeper events must register with 054 * the local instance of this watcher via {@link #registerListener}. 055 * 056 * <p>This class also holds and manages the connection to ZooKeeper. Code to 057 * deal with connection related events and exceptions are handled here. 058 */ 059@InterfaceAudience.Private 060public class ZKWatcher implements Watcher, Abortable, Closeable { 061 private static final Logger LOG = LoggerFactory.getLogger(ZKWatcher.class); 062 063 // Identifier for this watcher (for logging only). It is made of the prefix 064 // passed on construction and the zookeeper sessionid. 065 private String prefix; 066 private String identifier; 067 068 // zookeeper quorum 069 private String quorum; 070 071 // zookeeper connection 072 private final RecoverableZooKeeper recoverableZooKeeper; 073 074 // abortable in case of zk failure 075 protected Abortable abortable; 076 // Used if abortable is null 077 private boolean aborted = false; 078 079 public final ZNodePaths znodePaths; 080 081 // listeners to be notified 082 private final List<ZKListener> listeners = new CopyOnWriteArrayList<>(); 083 084 // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL 085 // negotiation to complete 086 public CountDownLatch saslLatch = new CountDownLatch(1); 087 088 private final Configuration conf; 089 090 /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ 091 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); 092 093 /** 094 * Instantiate a ZooKeeper connection and watcher. 095 * @param identifier string that is passed to RecoverableZookeeper to be used as 096 * identifier for this instance. Use null for default. 097 * @throws IOException if the connection to ZooKeeper fails 098 * @throws ZooKeeperConnectionException 099 */ 100 public ZKWatcher(Configuration conf, String identifier, 101 Abortable abortable) throws ZooKeeperConnectionException, IOException { 102 this(conf, identifier, abortable, false); 103 } 104 105 /** 106 * Instantiate a ZooKeeper connection and watcher. 107 * @param conf the configuration to use 108 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 109 * this instance. Use null for default. 110 * @param abortable Can be null if there is on error there is no host to abort: e.g. client 111 * context. 112 * @param canCreateBaseZNode true if a base ZNode can be created 113 * @throws IOException if the connection to ZooKeeper fails 114 * @throws ZooKeeperConnectionException 115 */ 116 public ZKWatcher(Configuration conf, String identifier, 117 Abortable abortable, boolean canCreateBaseZNode) 118 throws IOException, ZooKeeperConnectionException { 119 this.conf = conf; 120 this.quorum = ZKConfig.getZKQuorumServersString(conf); 121 this.prefix = identifier; 122 // Identifier will get the sessionid appended later below down when we 123 // handle the syncconnect event. 124 this.identifier = identifier + "0x0"; 125 this.abortable = abortable; 126 this.znodePaths = new ZNodePaths(conf); 127 PendingWatcher pendingWatcher = new PendingWatcher(); 128 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier); 129 pendingWatcher.prepare(this); 130 if (canCreateBaseZNode) { 131 try { 132 createBaseZNodes(); 133 } catch (ZooKeeperConnectionException zce) { 134 try { 135 this.recoverableZooKeeper.close(); 136 } catch (InterruptedException ie) { 137 LOG.debug("Encountered InterruptedException when closing " + this.recoverableZooKeeper); 138 Thread.currentThread().interrupt(); 139 } 140 throw zce; 141 } 142 } 143 } 144 145 private void createBaseZNodes() throws ZooKeeperConnectionException { 146 try { 147 // Create all the necessary "directories" of znodes 148 ZKUtil.createWithParents(this, znodePaths.baseZNode); 149 ZKUtil.createAndFailSilent(this, znodePaths.rsZNode); 150 ZKUtil.createAndFailSilent(this, znodePaths.drainingZNode); 151 ZKUtil.createAndFailSilent(this, znodePaths.tableZNode); 152 ZKUtil.createAndFailSilent(this, znodePaths.splitLogZNode); 153 ZKUtil.createAndFailSilent(this, znodePaths.backupMasterAddressesZNode); 154 ZKUtil.createAndFailSilent(this, znodePaths.tableLockZNode); 155 ZKUtil.createAndFailSilent(this, znodePaths.masterMaintZNode); 156 } catch (KeeperException e) { 157 throw new ZooKeeperConnectionException( 158 prefix("Unexpected KeeperException creating base node"), e); 159 } 160 } 161 162 /** 163 * On master start, we check the znode ACLs under the root directory and set the ACLs properly 164 * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed 165 * so that the existing znodes created with open permissions are now changed with restrictive 166 * perms. 167 */ 168 public void checkAndSetZNodeAcls() { 169 if (!ZKUtil.isSecureZooKeeper(getConfiguration())) { 170 LOG.info("not a secure deployment, proceeding"); 171 return; 172 } 173 174 // Check the base znodes permission first. Only do the recursion if base znode's perms are not 175 // correct. 176 try { 177 List<ACL> actualAcls = recoverableZooKeeper.getAcl(znodePaths.baseZNode, new Stat()); 178 179 if (!isBaseZnodeAclSetup(actualAcls)) { 180 LOG.info("setting znode ACLs"); 181 setZnodeAclsRecursive(znodePaths.baseZNode); 182 } 183 } catch(KeeperException.NoNodeException nne) { 184 return; 185 } catch(InterruptedException ie) { 186 interruptedExceptionNoThrow(ie, false); 187 } catch (IOException|KeeperException e) { 188 LOG.warn("Received exception while checking and setting zookeeper ACLs", e); 189 } 190 } 191 192 /** 193 * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs 194 * will be set last in case the master fails in between. 195 * @param znode the ZNode to set the permissions for 196 */ 197 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException { 198 List<String> children = recoverableZooKeeper.getChildren(znode, false); 199 200 for (String child : children) { 201 setZnodeAclsRecursive(ZNodePaths.joinZNode(znode, child)); 202 } 203 List<ACL> acls = ZKUtil.createACL(this, znode, true); 204 LOG.info("Setting ACLs for znode:" + znode + " , acl:" + acls); 205 recoverableZooKeeper.setAcl(znode, acls, -1); 206 } 207 208 /** 209 * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup. 210 * @param acls acls from zookeeper 211 * @return whether ACLs are set for the base znode 212 * @throws IOException if getting the current user fails 213 */ 214 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException { 215 if (LOG.isDebugEnabled()) { 216 LOG.debug("Checking znode ACLs"); 217 } 218 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY); 219 // Check whether ACL set for all superusers 220 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) { 221 return false; 222 } 223 224 // this assumes that current authenticated user is the same as zookeeper client user 225 // configured via JAAS 226 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 227 228 if (acls.isEmpty()) { 229 if (LOG.isDebugEnabled()) { 230 LOG.debug("ACL is empty"); 231 } 232 return false; 233 } 234 235 for (ACL acl : acls) { 236 int perms = acl.getPerms(); 237 Id id = acl.getId(); 238 // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser 239 // and one for the hbase user 240 if (Ids.ANYONE_ID_UNSAFE.equals(id)) { 241 if (perms != Perms.READ) { 242 if (LOG.isDebugEnabled()) { 243 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 244 id, perms, Perms.READ)); 245 } 246 return false; 247 } 248 } else if (superUsers != null && isSuperUserId(superUsers, id)) { 249 if (perms != Perms.ALL) { 250 if (LOG.isDebugEnabled()) { 251 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 252 id, perms, Perms.ALL)); 253 } 254 return false; 255 } 256 } else if ("sasl".equals(id.getScheme())) { 257 String name = id.getId(); 258 // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname 259 Matcher match = NAME_PATTERN.matcher(name); 260 if (match.matches()) { 261 name = match.group(1); 262 } 263 if (name.equals(hbaseUser)) { 264 if (perms != Perms.ALL) { 265 if (LOG.isDebugEnabled()) { 266 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 267 id, perms, Perms.ALL)); 268 } 269 return false; 270 } 271 } else { 272 if (LOG.isDebugEnabled()) { 273 LOG.debug("Unexpected shortname in SASL ACL: " + id); 274 } 275 return false; 276 } 277 } else { 278 if (LOG.isDebugEnabled()) { 279 LOG.debug("unexpected ACL id '" + id + "'"); 280 } 281 return false; 282 } 283 } 284 return true; 285 } 286 287 /* 288 * Validate whether ACL set for all superusers. 289 */ 290 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) { 291 for (String user : superUsers) { 292 boolean hasAccess = false; 293 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 294 if (!AuthUtil.isGroupPrincipal(user)) { 295 for (ACL acl : acls) { 296 if (user.equals(acl.getId().getId())) { 297 if (acl.getPerms() == Perms.ALL) { 298 hasAccess = true; 299 } else { 300 if (LOG.isDebugEnabled()) { 301 LOG.debug(String.format( 302 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x", 303 acl.getId().getId(), acl.getPerms(), Perms.ALL)); 304 } 305 } 306 break; 307 } 308 } 309 if (!hasAccess) { 310 return false; 311 } 312 } 313 } 314 return true; 315 } 316 317 /* 318 * Validate whether ACL ID is superuser. 319 */ 320 public static boolean isSuperUserId(String[] superUsers, Id id) { 321 for (String user : superUsers) { 322 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 323 if (!AuthUtil.isGroupPrincipal(user) && new Id("sasl", user).equals(id)) { 324 return true; 325 } 326 } 327 return false; 328 } 329 330 @Override 331 public String toString() { 332 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + znodePaths.baseZNode; 333 } 334 335 /** 336 * Adds this instance's identifier as a prefix to the passed <code>str</code> 337 * @param str String to amend. 338 * @return A new string with this instance's identifier as prefix: e.g. 339 * if passed 'hello world', the returned string could be 340 */ 341 public String prefix(final String str) { 342 return this.toString() + " " + str; 343 } 344 345 /** 346 * Get the znodes corresponding to the meta replicas from ZK 347 * @return list of znodes 348 * @throws KeeperException if a ZooKeeper operation fails 349 */ 350 public List<String> getMetaReplicaNodes() throws KeeperException { 351 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, znodePaths.baseZNode); 352 List<String> metaReplicaNodes = new ArrayList<>(2); 353 if (childrenOfBaseNode != null) { 354 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server"); 355 for (String child : childrenOfBaseNode) { 356 if (child.startsWith(pattern)) { 357 metaReplicaNodes.add(child); 358 } 359 } 360 } 361 return metaReplicaNodes; 362 } 363 364 /** 365 * Register the specified listener to receive ZooKeeper events. 366 * @param listener the listener to register 367 */ 368 public void registerListener(ZKListener listener) { 369 listeners.add(listener); 370 } 371 372 /** 373 * Register the specified listener to receive ZooKeeper events and add it as 374 * the first in the list of current listeners. 375 * @param listener the listener to register 376 */ 377 public void registerListenerFirst(ZKListener listener) { 378 listeners.add(0, listener); 379 } 380 381 public void unregisterListener(ZKListener listener) { 382 listeners.remove(listener); 383 } 384 385 /** 386 * Clean all existing listeners 387 */ 388 public void unregisterAllListeners() { 389 listeners.clear(); 390 } 391 392 /** 393 * Get a copy of current registered listeners 394 */ 395 public List<ZKListener> getListeners() { 396 return new ArrayList<>(listeners); 397 } 398 399 /** 400 * @return The number of currently registered listeners 401 */ 402 public int getNumberOfListeners() { 403 return listeners.size(); 404 } 405 406 /** 407 * Get the connection to ZooKeeper. 408 * @return connection reference to zookeeper 409 */ 410 public RecoverableZooKeeper getRecoverableZooKeeper() { 411 return recoverableZooKeeper; 412 } 413 414 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException { 415 recoverableZooKeeper.reconnectAfterExpiration(); 416 } 417 418 /** 419 * Get the quorum address of this instance. 420 * @return quorum string of this zookeeper connection instance 421 */ 422 public String getQuorum() { 423 return quorum; 424 } 425 426 /** 427 * Get the znodePaths. 428 * <p> 429 * Mainly used for mocking as mockito can not mock a field access. 430 */ 431 public ZNodePaths getZNodePaths() { 432 return znodePaths; 433 } 434 435 /** 436 * Method called from ZooKeeper for events and connection status. 437 * <p> 438 * Valid events are passed along to listeners. Connection status changes 439 * are dealt with locally. 440 */ 441 @Override 442 public void process(WatchedEvent event) { 443 LOG.debug(prefix("Received ZooKeeper Event, " + 444 "type=" + event.getType() + ", " + 445 "state=" + event.getState() + ", " + 446 "path=" + event.getPath())); 447 448 switch(event.getType()) { 449 450 // If event type is NONE, this is a connection status change 451 case None: { 452 connectionEvent(event); 453 break; 454 } 455 456 // Otherwise pass along to the listeners 457 458 case NodeCreated: { 459 for(ZKListener listener : listeners) { 460 listener.nodeCreated(event.getPath()); 461 } 462 break; 463 } 464 465 case NodeDeleted: { 466 for(ZKListener listener : listeners) { 467 listener.nodeDeleted(event.getPath()); 468 } 469 break; 470 } 471 472 case NodeDataChanged: { 473 for(ZKListener listener : listeners) { 474 listener.nodeDataChanged(event.getPath()); 475 } 476 break; 477 } 478 479 case NodeChildrenChanged: { 480 for(ZKListener listener : listeners) { 481 listener.nodeChildrenChanged(event.getPath()); 482 } 483 break; 484 } 485 } 486 } 487 488 // Connection management 489 490 /** 491 * Called when there is a connection-related event via the Watcher callback. 492 * <p> 493 * If Disconnected or Expired, this should shutdown the cluster. But, since 494 * we send a KeeperException.SessionExpiredException along with the abort 495 * call, it's possible for the Abortable to catch it and try to create a new 496 * session with ZooKeeper. This is what the client does in HCM. 497 * <p> 498 * @param event the connection-related event 499 */ 500 private void connectionEvent(WatchedEvent event) { 501 switch(event.getState()) { 502 case SyncConnected: 503 this.identifier = this.prefix + "-0x" + 504 Long.toHexString(this.recoverableZooKeeper.getSessionId()); 505 // Update our identifier. Otherwise ignore. 506 LOG.debug(this.identifier + " connected"); 507 break; 508 509 // Abort the server if Disconnected or Expired 510 case Disconnected: 511 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring")); 512 break; 513 514 case Expired: 515 String msg = prefix(this.identifier + " received expired from " + 516 "ZooKeeper, aborting"); 517 // TODO: One thought is to add call to ZKListener so say, 518 // ZKNodeTracker can zero out its data values. 519 if (this.abortable != null) { 520 this.abortable.abort(msg, new KeeperException.SessionExpiredException()); 521 } 522 break; 523 524 case ConnectedReadOnly: 525 case SaslAuthenticated: 526 case AuthFailed: 527 break; 528 529 default: 530 throw new IllegalStateException("Received event is not valid: " + event.getState()); 531 } 532 } 533 534 /** 535 * Forces a synchronization of this ZooKeeper client connection. 536 * <p> 537 * Executing this method before running other methods will ensure that the 538 * subsequent operations are up-to-date and consistent as of the time that 539 * the sync is complete. 540 * <p> 541 * This is used for compareAndSwap type operations where we need to read the 542 * data of an existing node and delete or transition that node, utilizing the 543 * previously read version and data. We want to ensure that the version read 544 * is up-to-date from when we begin the operation. 545 */ 546 public void sync(String path) throws KeeperException { 547 this.recoverableZooKeeper.sync(path, null, null); 548 } 549 550 /** 551 * Handles KeeperExceptions in client calls. 552 * <p> 553 * This may be temporary but for now this gives one place to deal with these. 554 * <p> 555 * TODO: Currently this method rethrows the exception to let the caller handle 556 * <p> 557 * @param ke the exception to rethrow 558 * @throws KeeperException if a ZooKeeper operation fails 559 */ 560 public void keeperException(KeeperException ke) throws KeeperException { 561 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke); 562 throw ke; 563 } 564 565 /** 566 * Handles InterruptedExceptions in client calls. 567 * @param ie the InterruptedException instance thrown 568 * @throws KeeperException the exception to throw, transformed from the InterruptedException 569 */ 570 public void interruptedException(InterruptedException ie) throws KeeperException { 571 interruptedExceptionNoThrow(ie, true); 572 // Throw a system error exception to let upper level handle it 573 throw new KeeperException.SystemErrorException(); 574 } 575 576 /** 577 * Log the InterruptedException and interrupt current thread 578 * @param ie The IterruptedException to log 579 * @param throwLater Whether we will throw the exception latter 580 */ 581 public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLater) { 582 LOG.debug(prefix("Received InterruptedException, will interrupt current thread" 583 + (throwLater ? " and rethrow a SystemErrorException" : "")), 584 ie); 585 // At least preserve interrupt. 586 Thread.currentThread().interrupt(); 587 } 588 589 /** 590 * Close the connection to ZooKeeper. 591 * 592 */ 593 @Override 594 public void close() { 595 try { 596 recoverableZooKeeper.close(); 597 } catch (InterruptedException e) { 598 Thread.currentThread().interrupt(); 599 } 600 } 601 602 public Configuration getConfiguration() { 603 return conf; 604 } 605 606 @Override 607 public void abort(String why, Throwable e) { 608 if (this.abortable != null) { 609 this.abortable.abort(why, e); 610 } else { 611 this.aborted = true; 612 } 613 } 614 615 @Override 616 public boolean isAborted() { 617 return this.abortable == null? this.aborted: this.abortable.isAborted(); 618 } 619}