001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.security.access; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.hbase.DaemonThreadFactory; 023import org.apache.hadoop.hbase.TableName; 024import org.apache.hadoop.hbase.util.Bytes; 025import org.apache.hadoop.hbase.zookeeper.ZKListener; 026import org.apache.hadoop.hbase.zookeeper.ZKUtil; 027import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 028import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.apache.zookeeper.KeeperException; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import java.io.Closeable; 035import java.io.IOException; 036import java.util.List; 037import java.util.concurrent.Callable; 038import java.util.concurrent.CountDownLatch; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.ExecutorService; 041import java.util.concurrent.Executors; 042import java.util.concurrent.Future; 043import java.util.concurrent.RejectedExecutionException; 044 045/** 046 * Handles synchronization of access control list entries and updates 047 * throughout all nodes in the cluster. The {@link AccessController} instance 048 * on the {@code _acl_} table regions, creates a znode for each table as 049 * {@code /hbase/acl/tablename}, with the znode data containing a serialized 050 * list of the permissions granted for the table. The {@code AccessController} 051 * instances on all other cluster hosts watch the znodes for updates, which 052 * trigger updates in the {@link AuthManager} permission cache. 053 */ 054@InterfaceAudience.Private 055public class ZKPermissionWatcher extends ZKListener implements Closeable { 056 private static final Logger LOG = LoggerFactory.getLogger(ZKPermissionWatcher.class); 057 // parent node for permissions lists 058 static final String ACL_NODE = "acl"; 059 private final AuthManager authManager; 060 private final String aclZNode; 061 private final CountDownLatch initialized = new CountDownLatch(1); 062 private final ExecutorService executor; 063 private Future<?> childrenChangedFuture; 064 065 public ZKPermissionWatcher(ZKWatcher watcher, 066 AuthManager authManager, Configuration conf) { 067 super(watcher); 068 this.authManager = authManager; 069 String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE); 070 this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent); 071 executor = Executors.newSingleThreadExecutor( 072 new DaemonThreadFactory("zk-permission-watcher")); 073 } 074 075 public void start() throws KeeperException { 076 try { 077 watcher.registerListener(this); 078 if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) { 079 try { 080 executor.submit(new Callable<Void>() { 081 @Override 082 public Void call() throws KeeperException { 083 List<ZKUtil.NodeAndData> existing = 084 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); 085 if (existing != null) { 086 refreshNodes(existing); 087 } 088 return null; 089 } 090 }).get(); 091 } catch (ExecutionException ex) { 092 if (ex.getCause() instanceof KeeperException) { 093 throw (KeeperException)ex.getCause(); 094 } else { 095 throw new RuntimeException(ex.getCause()); 096 } 097 } catch (InterruptedException ex) { 098 Thread.currentThread().interrupt(); 099 } 100 } 101 } finally { 102 initialized.countDown(); 103 } 104 } 105 106 @Override 107 public void close() { 108 executor.shutdown(); 109 } 110 111 private void waitUntilStarted() { 112 try { 113 initialized.await(); 114 } catch (InterruptedException e) { 115 LOG.warn("Interrupted while waiting for start", e); 116 Thread.currentThread().interrupt(); 117 } 118 } 119 120 @Override 121 public void nodeCreated(String path) { 122 waitUntilStarted(); 123 if (path.equals(aclZNode)) { 124 asyncProcessNodeUpdate(new Runnable() { 125 @Override 126 public void run() { 127 try { 128 List<ZKUtil.NodeAndData> nodes = 129 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); 130 refreshNodes(nodes); 131 } catch (KeeperException ke) { 132 LOG.error("Error reading data from zookeeper", ke); 133 // only option is to abort 134 watcher.abort("ZooKeeper error obtaining acl node children", ke); 135 } 136 } 137 }); 138 } 139 } 140 141 @Override 142 public void nodeDeleted(final String path) { 143 waitUntilStarted(); 144 if (aclZNode.equals(ZKUtil.getParent(path))) { 145 asyncProcessNodeUpdate(new Runnable() { 146 @Override 147 public void run() { 148 String table = ZKUtil.getNodeName(path); 149 if (PermissionStorage.isNamespaceEntry(table)) { 150 authManager.removeNamespace(Bytes.toBytes(table)); 151 } else { 152 authManager.removeTable(TableName.valueOf(table)); 153 } 154 } 155 }); 156 } 157 } 158 159 @Override 160 public void nodeDataChanged(final String path) { 161 waitUntilStarted(); 162 if (aclZNode.equals(ZKUtil.getParent(path))) { 163 asyncProcessNodeUpdate(new Runnable() { 164 @Override 165 public void run() { 166 // update cache on an existing table node 167 String entry = ZKUtil.getNodeName(path); 168 try { 169 byte[] data = ZKUtil.getDataAndWatch(watcher, path); 170 refreshAuthManager(entry, data); 171 } catch (KeeperException ke) { 172 LOG.error("Error reading data from zookeeper for node " + entry, ke); 173 // only option is to abort 174 watcher.abort("ZooKeeper error getting data for node " + entry, ke); 175 } catch (IOException ioe) { 176 LOG.error("Error reading permissions writables", ioe); 177 } 178 } 179 }); 180 } 181 } 182 183 184 @Override 185 public void nodeChildrenChanged(final String path) { 186 waitUntilStarted(); 187 if (path.equals(aclZNode)) { 188 // preempt any existing nodeChildrenChanged event processing 189 if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) { 190 boolean cancelled = childrenChangedFuture.cancel(true); 191 if (!cancelled) { 192 // task may have finished between our check and attempted cancel, this is fine. 193 if (!childrenChangedFuture.isDone()) { 194 LOG.warn("Could not cancel processing node children changed event, " 195 + "please file a JIRA and attach logs if possible."); 196 } 197 } 198 } 199 childrenChangedFuture = asyncProcessNodeUpdate(() -> { 200 try { 201 final List<ZKUtil.NodeAndData> nodeList = 202 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode, false); 203 refreshNodes(nodeList); 204 } catch (KeeperException ke) { 205 String msg = "ZooKeeper error while reading node children data for path " + path; 206 LOG.error(msg, ke); 207 watcher.abort(msg, ke); 208 } 209 }); 210 } 211 } 212 213 private Future<?> asyncProcessNodeUpdate(Runnable runnable) { 214 if (!executor.isShutdown()) { 215 try { 216 return executor.submit(runnable); 217 } catch (RejectedExecutionException e) { 218 if (executor.isShutdown()) { 219 LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown"); 220 } else { 221 throw e; 222 } 223 } 224 } 225 return null; // No task launched so there will be nothing to cancel later 226 } 227 228 private void refreshNodes(List<ZKUtil.NodeAndData> nodes) { 229 for (ZKUtil.NodeAndData n : nodes) { 230 if (Thread.interrupted()) { 231 // Use Thread.interrupted so that we clear interrupt status 232 break; 233 } 234 if (n.isEmpty()) continue; 235 String path = n.getNode(); 236 String entry = (ZKUtil.getNodeName(path)); 237 try { 238 refreshAuthManager(entry, n.getData()); 239 } catch (IOException ioe) { 240 LOG.error("Failed parsing permissions for table '" + entry + 241 "' from zk", ioe); 242 } 243 } 244 } 245 246 private void refreshAuthManager(String entry, byte[] nodeData) throws IOException { 247 if (LOG.isDebugEnabled()) { 248 LOG.debug("Updating permissions cache from {} with data {}", entry, 249 Bytes.toStringBinary(nodeData)); 250 } 251 if (PermissionStorage.isNamespaceEntry(entry)) { 252 authManager.refreshNamespaceCacheFromWritable(PermissionStorage.fromNamespaceEntry(entry), 253 nodeData); 254 } else { 255 authManager.refreshTableCacheFromWritable(TableName.valueOf(entry), nodeData); 256 } 257 } 258 259 /*** 260 * Write a table's access controls to the permissions mirror in zookeeper 261 * @param entry 262 * @param permsData 263 */ 264 public void writeToZookeeper(byte[] entry, byte[] permsData) { 265 String entryName = Bytes.toString(entry); 266 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 267 zkNode = ZNodePaths.joinZNode(zkNode, entryName); 268 269 try { 270 ZKUtil.createWithParents(watcher, zkNode); 271 ZKUtil.updateExistingNodeData(watcher, zkNode, permsData, -1); 272 } catch (KeeperException e) { 273 LOG.error("Failed updating permissions for entry '" + 274 entryName + "'", e); 275 watcher.abort("Failed writing node "+zkNode+" to zookeeper", e); 276 } 277 } 278 279 /*** 280 * Delete the acl notify node of table 281 * @param tableName 282 */ 283 public void deleteTableACLNode(final TableName tableName) { 284 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 285 zkNode = ZNodePaths.joinZNode(zkNode, tableName.getNameAsString()); 286 287 try { 288 ZKUtil.deleteNode(watcher, zkNode); 289 } catch (KeeperException.NoNodeException e) { 290 LOG.warn("No acl notify node of table '" + tableName + "'"); 291 } catch (KeeperException e) { 292 LOG.error("Failed deleting acl node of table '" + tableName + "'", e); 293 watcher.abort("Failed deleting node " + zkNode, e); 294 } 295 } 296 297 /*** 298 * Delete the acl notify node of namespace 299 */ 300 public void deleteNamespaceACLNode(final String namespace) { 301 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 302 zkNode = ZNodePaths.joinZNode(zkNode, PermissionStorage.NAMESPACE_PREFIX + namespace); 303 304 try { 305 ZKUtil.deleteNode(watcher, zkNode); 306 } catch (KeeperException.NoNodeException e) { 307 LOG.warn("No acl notify node of namespace '" + namespace + "'"); 308 } catch (KeeperException e) { 309 LOG.error("Failed deleting acl node of namespace '" + namespace + "'", e); 310 watcher.abort("Failed deleting node " + zkNode, e); 311 } 312 } 313}