001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.security.access; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.RejectedExecutionException; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.Threads; 035import org.apache.hadoop.hbase.zookeeper.ZKListener; 036import org.apache.hadoop.hbase.zookeeper.ZKUtil; 037import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 038import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.zookeeper.KeeperException; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 045 046/** 047 * Handles synchronization of access control list entries and updates 048 * throughout all nodes in the cluster. The {@link AccessController} instance 049 * on the {@code _acl_} table regions, creates a znode for each table as 050 * {@code /hbase/acl/tablename}, with the znode data containing a serialized 051 * list of the permissions granted for the table. The {@code AccessController} 052 * instances on all other cluster hosts watch the znodes for updates, which 053 * trigger updates in the {@link AuthManager} permission cache. 054 */ 055@InterfaceAudience.Private 056public class ZKPermissionWatcher extends ZKListener implements Closeable { 057 private static final Logger LOG = LoggerFactory.getLogger(ZKPermissionWatcher.class); 058 // parent node for permissions lists 059 static final String ACL_NODE = "acl"; 060 private final AuthManager authManager; 061 private final String aclZNode; 062 private final CountDownLatch initialized = new CountDownLatch(1); 063 private final ExecutorService executor; 064 private Future<?> childrenChangedFuture; 065 066 public ZKPermissionWatcher(ZKWatcher watcher, 067 AuthManager authManager, Configuration conf) { 068 super(watcher); 069 this.authManager = authManager; 070 String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE); 071 this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent); 072 executor = Executors.newSingleThreadExecutor( 073 new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d").setDaemon(true) 074 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 075 } 076 077 public void start() throws KeeperException { 078 try { 079 watcher.registerListener(this); 080 if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) { 081 try { 082 executor.submit(new Callable<Void>() { 083 @Override 084 public Void call() throws KeeperException { 085 List<ZKUtil.NodeAndData> existing = 086 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); 087 if (existing != null) { 088 refreshNodes(existing); 089 } 090 return null; 091 } 092 }).get(); 093 } catch (ExecutionException ex) { 094 if (ex.getCause() instanceof KeeperException) { 095 throw (KeeperException)ex.getCause(); 096 } else { 097 throw new RuntimeException(ex.getCause()); 098 } 099 } catch (InterruptedException ex) { 100 Thread.currentThread().interrupt(); 101 } 102 } 103 } finally { 104 initialized.countDown(); 105 } 106 } 107 108 @Override 109 public void close() { 110 executor.shutdown(); 111 } 112 113 private void waitUntilStarted() { 114 try { 115 initialized.await(); 116 } catch (InterruptedException e) { 117 LOG.warn("Interrupted while waiting for start", e); 118 Thread.currentThread().interrupt(); 119 } 120 } 121 122 @Override 123 public void nodeCreated(String path) { 124 waitUntilStarted(); 125 if (path.equals(aclZNode)) { 126 asyncProcessNodeUpdate(new Runnable() { 127 @Override 128 public void run() { 129 try { 130 List<ZKUtil.NodeAndData> nodes = 131 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); 132 refreshNodes(nodes); 133 } catch (KeeperException ke) { 134 LOG.error("Error reading data from zookeeper", ke); 135 // only option is to abort 136 watcher.abort("ZooKeeper error obtaining acl node children", ke); 137 } 138 } 139 }); 140 } 141 } 142 143 @Override 144 public void nodeDeleted(final String path) { 145 waitUntilStarted(); 146 if (aclZNode.equals(ZKUtil.getParent(path))) { 147 asyncProcessNodeUpdate(new Runnable() { 148 @Override 149 public void run() { 150 String table = ZKUtil.getNodeName(path); 151 if (PermissionStorage.isNamespaceEntry(table)) { 152 authManager.removeNamespace(Bytes.toBytes(table)); 153 } else { 154 authManager.removeTable(TableName.valueOf(table)); 155 } 156 } 157 }); 158 } 159 } 160 161 @Override 162 public void nodeDataChanged(final String path) { 163 waitUntilStarted(); 164 if (aclZNode.equals(ZKUtil.getParent(path))) { 165 asyncProcessNodeUpdate(new Runnable() { 166 @Override 167 public void run() { 168 // update cache on an existing table node 169 String entry = ZKUtil.getNodeName(path); 170 try { 171 byte[] data = ZKUtil.getDataAndWatch(watcher, path); 172 refreshAuthManager(entry, data); 173 } catch (KeeperException ke) { 174 LOG.error("Error reading data from zookeeper for node " + entry, ke); 175 // only option is to abort 176 watcher.abort("ZooKeeper error getting data for node " + entry, ke); 177 } catch (IOException ioe) { 178 LOG.error("Error reading permissions writables", ioe); 179 } 180 } 181 }); 182 } 183 } 184 185 186 @Override 187 public void nodeChildrenChanged(final String path) { 188 waitUntilStarted(); 189 if (path.equals(aclZNode)) { 190 // preempt any existing nodeChildrenChanged event processing 191 if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) { 192 boolean cancelled = childrenChangedFuture.cancel(true); 193 if (!cancelled) { 194 // task may have finished between our check and attempted cancel, this is fine. 195 if (!childrenChangedFuture.isDone()) { 196 LOG.warn("Could not cancel processing node children changed event, " 197 + "please file a JIRA and attach logs if possible."); 198 } 199 } 200 } 201 childrenChangedFuture = asyncProcessNodeUpdate(() -> { 202 try { 203 final List<ZKUtil.NodeAndData> nodeList = 204 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode, false); 205 refreshNodes(nodeList); 206 } catch (KeeperException ke) { 207 String msg = "ZooKeeper error while reading node children data for path " + path; 208 LOG.error(msg, ke); 209 watcher.abort(msg, ke); 210 } 211 }); 212 } 213 } 214 215 private Future<?> asyncProcessNodeUpdate(Runnable runnable) { 216 if (!executor.isShutdown()) { 217 try { 218 return executor.submit(runnable); 219 } catch (RejectedExecutionException e) { 220 if (executor.isShutdown()) { 221 LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown"); 222 } else { 223 throw e; 224 } 225 } 226 } 227 return null; // No task launched so there will be nothing to cancel later 228 } 229 230 private void refreshNodes(List<ZKUtil.NodeAndData> nodes) { 231 for (ZKUtil.NodeAndData n : nodes) { 232 if (Thread.interrupted()) { 233 // Use Thread.interrupted so that we clear interrupt status 234 break; 235 } 236 if (n.isEmpty()) continue; 237 String path = n.getNode(); 238 String entry = (ZKUtil.getNodeName(path)); 239 try { 240 refreshAuthManager(entry, n.getData()); 241 } catch (IOException ioe) { 242 LOG.error("Failed parsing permissions for table '" + entry + 243 "' from zk", ioe); 244 } 245 } 246 } 247 248 private void refreshAuthManager(String entry, byte[] nodeData) throws IOException { 249 if (LOG.isDebugEnabled()) { 250 LOG.debug("Updating permissions cache from {} with data {}", entry, 251 Bytes.toStringBinary(nodeData)); 252 } 253 if (PermissionStorage.isNamespaceEntry(entry)) { 254 authManager.refreshNamespaceCacheFromWritable(PermissionStorage.fromNamespaceEntry(entry), 255 nodeData); 256 } else { 257 authManager.refreshTableCacheFromWritable(TableName.valueOf(entry), nodeData); 258 } 259 } 260 261 /*** 262 * Write a table's access controls to the permissions mirror in zookeeper 263 * @param entry 264 * @param permsData 265 */ 266 public void writeToZookeeper(byte[] entry, byte[] permsData) { 267 String entryName = Bytes.toString(entry); 268 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 269 zkNode = ZNodePaths.joinZNode(zkNode, entryName); 270 271 try { 272 ZKUtil.createWithParents(watcher, zkNode); 273 ZKUtil.updateExistingNodeData(watcher, zkNode, permsData, -1); 274 } catch (KeeperException e) { 275 LOG.error("Failed updating permissions for entry '" + 276 entryName + "'", e); 277 watcher.abort("Failed writing node "+zkNode+" to zookeeper", e); 278 } 279 } 280 281 /*** 282 * Delete the acl notify node of table 283 * @param tableName 284 */ 285 public void deleteTableACLNode(final TableName tableName) { 286 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 287 zkNode = ZNodePaths.joinZNode(zkNode, tableName.getNameAsString()); 288 289 try { 290 ZKUtil.deleteNode(watcher, zkNode); 291 } catch (KeeperException.NoNodeException e) { 292 LOG.warn("No acl notify node of table '" + tableName + "'"); 293 } catch (KeeperException e) { 294 LOG.error("Failed deleting acl node of table '" + tableName + "'", e); 295 watcher.abort("Failed deleting node " + zkNode, e); 296 } 297 } 298 299 /*** 300 * Delete the acl notify node of namespace 301 */ 302 public void deleteNamespaceACLNode(final String namespace) { 303 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 304 zkNode = ZNodePaths.joinZNode(zkNode, PermissionStorage.NAMESPACE_PREFIX + namespace); 305 306 try { 307 ZKUtil.deleteNode(watcher, zkNode); 308 } catch (KeeperException.NoNodeException e) { 309 LOG.warn("No acl notify node of namespace '" + namespace + "'"); 310 } catch (KeeperException e) { 311 LOG.error("Failed deleting acl node of namespace '" + namespace + "'", e); 312 watcher.abort("Failed deleting node " + zkNode, e); 313 } 314 } 315}