001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.security.access; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.hbase.DaemonThreadFactory; 023import org.apache.hadoop.hbase.TableName; 024import org.apache.hadoop.hbase.util.Bytes; 025import org.apache.hadoop.hbase.zookeeper.ZKListener; 026import org.apache.hadoop.hbase.zookeeper.ZKUtil; 027import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 028import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.apache.zookeeper.KeeperException; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import java.io.Closeable; 035import java.io.IOException; 036import java.util.List; 037import java.util.concurrent.Callable; 038import java.util.concurrent.CountDownLatch; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.ExecutorService; 041import java.util.concurrent.Executors; 042import java.util.concurrent.Future; 043import java.util.concurrent.RejectedExecutionException; 044 045/** 046 * Handles synchronization of access control list entries and updates 047 * throughout all nodes in the cluster. The {@link AccessController} instance 048 * on the {@code _acl_} table regions, creates a znode for each table as 049 * {@code /hbase/acl/tablename}, with the znode data containing a serialized 050 * list of the permissions granted for the table. The {@code AccessController} 051 * instances on all other cluster hosts watch the znodes for updates, which 052 * trigger updates in the {@link AuthManager} permission cache. 053 */ 054@InterfaceAudience.Private 055public class ZKPermissionWatcher extends ZKListener implements Closeable { 056 private static final Logger LOG = LoggerFactory.getLogger(ZKPermissionWatcher.class); 057 // parent node for permissions lists 058 static final String ACL_NODE = "acl"; 059 private final AuthManager authManager; 060 private final String aclZNode; 061 private final CountDownLatch initialized = new CountDownLatch(1); 062 private final ExecutorService executor; 063 private Future<?> childrenChangedFuture; 064 065 public ZKPermissionWatcher(ZKWatcher watcher, 066 AuthManager authManager, Configuration conf) { 067 super(watcher); 068 this.authManager = authManager; 069 String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE); 070 this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent); 071 executor = Executors.newSingleThreadExecutor( 072 new DaemonThreadFactory("zk-permission-watcher")); 073 } 074 075 public void start() throws KeeperException { 076 try { 077 watcher.registerListener(this); 078 if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) { 079 try { 080 executor.submit(new Callable<Void>() { 081 @Override 082 public Void call() throws KeeperException { 083 List<ZKUtil.NodeAndData> existing = 084 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); 085 if (existing != null) { 086 refreshNodes(existing); 087 } 088 return null; 089 } 090 }).get(); 091 } catch (ExecutionException ex) { 092 if (ex.getCause() instanceof KeeperException) { 093 throw (KeeperException)ex.getCause(); 094 } else { 095 throw new RuntimeException(ex.getCause()); 096 } 097 } catch (InterruptedException ex) { 098 Thread.currentThread().interrupt(); 099 } 100 } 101 } finally { 102 initialized.countDown(); 103 } 104 } 105 106 @Override 107 public void close() { 108 executor.shutdown(); 109 } 110 111 private void waitUntilStarted() { 112 try { 113 initialized.await(); 114 } catch (InterruptedException e) { 115 LOG.warn("Interrupted while waiting for start", e); 116 Thread.currentThread().interrupt(); 117 } 118 } 119 120 @Override 121 public void nodeCreated(String path) { 122 waitUntilStarted(); 123 if (path.equals(aclZNode)) { 124 asyncProcessNodeUpdate(new Runnable() { 125 @Override 126 public void run() { 127 try { 128 List<ZKUtil.NodeAndData> nodes = 129 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); 130 refreshNodes(nodes); 131 } catch (KeeperException ke) { 132 LOG.error("Error reading data from zookeeper", ke); 133 // only option is to abort 134 watcher.abort("ZooKeeper error obtaining acl node children", ke); 135 } 136 } 137 }); 138 } 139 } 140 141 @Override 142 public void nodeDeleted(final String path) { 143 waitUntilStarted(); 144 if (aclZNode.equals(ZKUtil.getParent(path))) { 145 asyncProcessNodeUpdate(new Runnable() { 146 @Override 147 public void run() { 148 String table = ZKUtil.getNodeName(path); 149 if(AccessControlLists.isNamespaceEntry(table)) { 150 authManager.removeNamespace(Bytes.toBytes(table)); 151 } else { 152 authManager.removeTable(TableName.valueOf(table)); 153 } 154 } 155 }); 156 } 157 } 158 159 @Override 160 public void nodeDataChanged(final String path) { 161 waitUntilStarted(); 162 if (aclZNode.equals(ZKUtil.getParent(path))) { 163 asyncProcessNodeUpdate(new Runnable() { 164 @Override 165 public void run() { 166 // update cache on an existing table node 167 String entry = ZKUtil.getNodeName(path); 168 try { 169 byte[] data = ZKUtil.getDataAndWatch(watcher, path); 170 refreshAuthManager(entry, data); 171 } catch (KeeperException ke) { 172 LOG.error("Error reading data from zookeeper for node " + entry, ke); 173 // only option is to abort 174 watcher.abort("ZooKeeper error getting data for node " + entry, ke); 175 } catch (IOException ioe) { 176 LOG.error("Error reading permissions writables", ioe); 177 } 178 } 179 }); 180 } 181 } 182 183 184 @Override 185 public void nodeChildrenChanged(final String path) { 186 waitUntilStarted(); 187 if (path.equals(aclZNode)) { 188 try { 189 final List<ZKUtil.NodeAndData> nodeList = 190 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); 191 // preempt any existing nodeChildrenChanged event processing 192 if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) { 193 boolean cancelled = childrenChangedFuture.cancel(true); 194 if (!cancelled) { 195 // task may have finished between our check and attempted cancel, this is fine. 196 if (! childrenChangedFuture.isDone()) { 197 LOG.warn("Could not cancel processing node children changed event, " + 198 "please file a JIRA and attach logs if possible."); 199 } 200 } 201 } 202 childrenChangedFuture = asyncProcessNodeUpdate(() -> refreshNodes(nodeList)); 203 } catch (KeeperException ke) { 204 LOG.error("Error reading data from zookeeper for path "+path, ke); 205 watcher.abort("ZooKeeper error get node children for path "+path, ke); 206 } 207 } 208 } 209 210 private Future<?> asyncProcessNodeUpdate(Runnable runnable) { 211 if (!executor.isShutdown()) { 212 try { 213 return executor.submit(runnable); 214 } catch (RejectedExecutionException e) { 215 if (executor.isShutdown()) { 216 LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown"); 217 } else { 218 throw e; 219 } 220 } 221 } 222 return null; // No task launched so there will be nothing to cancel later 223 } 224 225 private void refreshNodes(List<ZKUtil.NodeAndData> nodes) { 226 for (ZKUtil.NodeAndData n : nodes) { 227 if (Thread.interrupted()) { 228 // Use Thread.interrupted so that we clear interrupt status 229 break; 230 } 231 if (n.isEmpty()) continue; 232 String path = n.getNode(); 233 String entry = (ZKUtil.getNodeName(path)); 234 try { 235 refreshAuthManager(entry, n.getData()); 236 } catch (IOException ioe) { 237 LOG.error("Failed parsing permissions for table '" + entry + 238 "' from zk", ioe); 239 } 240 } 241 } 242 243 private void refreshAuthManager(String entry, byte[] nodeData) throws IOException { 244 if (LOG.isDebugEnabled()) { 245 LOG.debug("Updating permissions cache from {} with data {}", entry, 246 Bytes.toStringBinary(nodeData)); 247 } 248 if(AccessControlLists.isNamespaceEntry(entry)) { 249 authManager.refreshNamespaceCacheFromWritable( 250 AccessControlLists.fromNamespaceEntry(entry), nodeData); 251 } else { 252 authManager.refreshTableCacheFromWritable(TableName.valueOf(entry), nodeData); 253 } 254 } 255 256 /*** 257 * Write a table's access controls to the permissions mirror in zookeeper 258 * @param entry 259 * @param permsData 260 */ 261 public void writeToZookeeper(byte[] entry, byte[] permsData) { 262 String entryName = Bytes.toString(entry); 263 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 264 zkNode = ZNodePaths.joinZNode(zkNode, entryName); 265 266 try { 267 ZKUtil.createWithParents(watcher, zkNode); 268 ZKUtil.updateExistingNodeData(watcher, zkNode, permsData, -1); 269 } catch (KeeperException e) { 270 LOG.error("Failed updating permissions for entry '" + 271 entryName + "'", e); 272 watcher.abort("Failed writing node "+zkNode+" to zookeeper", e); 273 } 274 } 275 276 /*** 277 * Delete the acl notify node of table 278 * @param tableName 279 */ 280 public void deleteTableACLNode(final TableName tableName) { 281 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 282 zkNode = ZNodePaths.joinZNode(zkNode, tableName.getNameAsString()); 283 284 try { 285 ZKUtil.deleteNode(watcher, zkNode); 286 } catch (KeeperException.NoNodeException e) { 287 LOG.warn("No acl notify node of table '" + tableName + "'"); 288 } catch (KeeperException e) { 289 LOG.error("Failed deleting acl node of table '" + tableName + "'", e); 290 watcher.abort("Failed deleting node " + zkNode, e); 291 } 292 } 293 294 /*** 295 * Delete the acl notify node of namespace 296 */ 297 public void deleteNamespaceACLNode(final String namespace) { 298 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 299 zkNode = ZNodePaths.joinZNode(zkNode, AccessControlLists.NAMESPACE_PREFIX + namespace); 300 301 try { 302 ZKUtil.deleteNode(watcher, zkNode); 303 } catch (KeeperException.NoNodeException e) { 304 LOG.warn("No acl notify node of namespace '" + namespace + "'"); 305 } catch (KeeperException e) { 306 LOG.error("Failed deleting acl node of namespace '" + namespace + "'", e); 307 watcher.abort("Failed deleting node " + zkNode, e); 308 } 309 } 310}