001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security.access; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.List; 023import java.util.concurrent.Callable; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.Future; 029import java.util.concurrent.RejectedExecutionException; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.Threads; 034import org.apache.hadoop.hbase.zookeeper.ZKListener; 035import org.apache.hadoop.hbase.zookeeper.ZKUtil; 036import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 037import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.apache.zookeeper.KeeperException; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 044 045/** 046 * Handles synchronization of access control list entries and updates throughout all nodes in the 047 * cluster. The {@link AccessController} instance on the {@code _acl_} table regions, creates a 048 * znode for each table as {@code /hbase/acl/tablename}, with the znode data containing a serialized 049 * list of the permissions granted for the table. The {@code AccessController} instances on all 050 * other cluster hosts watch the znodes for updates, which trigger updates in the 051 * {@link AuthManager} permission cache. 052 */ 053@InterfaceAudience.Private 054public class ZKPermissionWatcher extends ZKListener implements Closeable { 055 private static final Logger LOG = LoggerFactory.getLogger(ZKPermissionWatcher.class); 056 // parent node for permissions lists 057 static final String ACL_NODE = "acl"; 058 private final AuthManager authManager; 059 private final String aclZNode; 060 private final CountDownLatch initialized = new CountDownLatch(1); 061 private final ExecutorService executor; 062 private Future<?> childrenChangedFuture; 063 064 public ZKPermissionWatcher(ZKWatcher watcher, AuthManager authManager, Configuration conf) { 065 super(watcher); 066 this.authManager = authManager; 067 String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE); 068 this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent); 069 executor = Executors.newSingleThreadExecutor( 070 new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d").setDaemon(true) 071 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 072 } 073 074 public void start() throws KeeperException { 075 try { 076 watcher.registerListener(this); 077 if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) { 078 try { 079 executor.submit(new Callable<Void>() { 080 @Override 081 public Void call() throws KeeperException { 082 List<ZKUtil.NodeAndData> existing = 083 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); 084 if (existing != null) { 085 refreshNodes(existing); 086 } 087 return null; 088 } 089 }).get(); 090 } catch (ExecutionException ex) { 091 if (ex.getCause() instanceof KeeperException) { 092 throw (KeeperException) ex.getCause(); 093 } else { 094 throw new RuntimeException(ex.getCause()); 095 } 096 } catch (InterruptedException ex) { 097 Thread.currentThread().interrupt(); 098 } 099 } 100 } finally { 101 initialized.countDown(); 102 } 103 } 104 105 @Override 106 public void close() { 107 executor.shutdown(); 108 } 109 110 private void waitUntilStarted() { 111 try { 112 initialized.await(); 113 } catch (InterruptedException e) { 114 LOG.warn("Interrupted while waiting for start", e); 115 Thread.currentThread().interrupt(); 116 } 117 } 118 119 @Override 120 public void nodeCreated(String path) { 121 waitUntilStarted(); 122 if (path.equals(aclZNode)) { 123 asyncProcessNodeUpdate(new Runnable() { 124 @Override 125 public void run() { 126 try { 127 List<ZKUtil.NodeAndData> nodes = 128 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); 129 refreshNodes(nodes); 130 } catch (KeeperException ke) { 131 LOG.error("Error reading data from zookeeper", ke); 132 // only option is to abort 133 watcher.abort("ZooKeeper error obtaining acl node children", ke); 134 } 135 } 136 }); 137 } 138 } 139 140 @Override 141 public void nodeDeleted(final String path) { 142 waitUntilStarted(); 143 if (aclZNode.equals(ZKUtil.getParent(path))) { 144 asyncProcessNodeUpdate(new Runnable() { 145 @Override 146 public void run() { 147 String table = ZKUtil.getNodeName(path); 148 if (PermissionStorage.isNamespaceEntry(table)) { 149 authManager.removeNamespace(Bytes.toBytes(table)); 150 } else { 151 authManager.removeTable(TableName.valueOf(table)); 152 } 153 } 154 }); 155 } 156 } 157 158 @Override 159 public void nodeDataChanged(final String path) { 160 waitUntilStarted(); 161 if (aclZNode.equals(ZKUtil.getParent(path))) { 162 asyncProcessNodeUpdate(new Runnable() { 163 @Override 164 public void run() { 165 // update cache on an existing table node 166 String entry = ZKUtil.getNodeName(path); 167 try { 168 byte[] data = ZKUtil.getDataAndWatch(watcher, path); 169 refreshAuthManager(entry, data); 170 } catch (KeeperException ke) { 171 LOG.error("Error reading data from zookeeper for node " + entry, ke); 172 // only option is to abort 173 watcher.abort("ZooKeeper error getting data for node " + entry, ke); 174 } catch (IOException ioe) { 175 LOG.error("Error reading permissions writables", ioe); 176 } 177 } 178 }); 179 } 180 } 181 182 @Override 183 public void nodeChildrenChanged(final String path) { 184 waitUntilStarted(); 185 if (path.equals(aclZNode)) { 186 // preempt any existing nodeChildrenChanged event processing 187 if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) { 188 boolean cancelled = childrenChangedFuture.cancel(true); 189 if (!cancelled) { 190 // task may have finished between our check and attempted cancel, this is fine. 191 if (!childrenChangedFuture.isDone()) { 192 LOG.warn("Could not cancel processing node children changed event, " 193 + "please file a JIRA and attach logs if possible."); 194 } 195 } 196 } 197 childrenChangedFuture = asyncProcessNodeUpdate(() -> { 198 try { 199 final List<ZKUtil.NodeAndData> nodeList = 200 ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode, false); 201 refreshNodes(nodeList); 202 } catch (KeeperException ke) { 203 String msg = "ZooKeeper error while reading node children data for path " + path; 204 LOG.error(msg, ke); 205 watcher.abort(msg, ke); 206 } 207 }); 208 } 209 } 210 211 private Future<?> asyncProcessNodeUpdate(Runnable runnable) { 212 if (!executor.isShutdown()) { 213 try { 214 return executor.submit(runnable); 215 } catch (RejectedExecutionException e) { 216 if (executor.isShutdown()) { 217 LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown"); 218 } else { 219 throw e; 220 } 221 } 222 } 223 return null; // No task launched so there will be nothing to cancel later 224 } 225 226 private void refreshNodes(List<ZKUtil.NodeAndData> nodes) { 227 for (ZKUtil.NodeAndData n : nodes) { 228 if (Thread.interrupted()) { 229 // Use Thread.interrupted so that we clear interrupt status 230 break; 231 } 232 if (n.isEmpty()) continue; 233 String path = n.getNode(); 234 String entry = (ZKUtil.getNodeName(path)); 235 try { 236 refreshAuthManager(entry, n.getData()); 237 } catch (IOException ioe) { 238 LOG.error("Failed parsing permissions for table '" + entry + "' from zk", ioe); 239 } 240 } 241 } 242 243 private void refreshAuthManager(String entry, byte[] nodeData) throws IOException { 244 if (LOG.isDebugEnabled()) { 245 LOG.debug("Updating permissions cache from {} with data {}", entry, 246 Bytes.toStringBinary(nodeData)); 247 } 248 if (PermissionStorage.isNamespaceEntry(entry)) { 249 authManager.refreshNamespaceCacheFromWritable(PermissionStorage.fromNamespaceEntry(entry), 250 nodeData); 251 } else { 252 authManager.refreshTableCacheFromWritable(TableName.valueOf(entry), nodeData); 253 } 254 } 255 256 /*** 257 * Write a table's access controls to the permissions mirror in zookeeper 258 */ 259 public void writeToZookeeper(byte[] entry, byte[] permsData) { 260 String entryName = Bytes.toString(entry); 261 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 262 zkNode = ZNodePaths.joinZNode(zkNode, entryName); 263 264 try { 265 ZKUtil.createWithParents(watcher, zkNode); 266 ZKUtil.updateExistingNodeData(watcher, zkNode, permsData, -1); 267 } catch (KeeperException e) { 268 LOG.error("Failed updating permissions for entry '" + entryName + "'", e); 269 watcher.abort("Failed writing node " + zkNode + " to zookeeper", e); 270 } 271 } 272 273 /*** 274 * Delete the acl notify node of table 275 */ 276 public void deleteTableACLNode(final TableName tableName) { 277 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 278 zkNode = ZNodePaths.joinZNode(zkNode, tableName.getNameAsString()); 279 280 try { 281 ZKUtil.deleteNode(watcher, zkNode); 282 } catch (KeeperException.NoNodeException e) { 283 LOG.warn("No acl notify node of table '" + tableName + "'"); 284 } catch (KeeperException e) { 285 LOG.error("Failed deleting acl node of table '" + tableName + "'", e); 286 watcher.abort("Failed deleting node " + zkNode, e); 287 } 288 } 289 290 /*** 291 * Delete the acl notify node of namespace 292 */ 293 public void deleteNamespaceACLNode(final String namespace) { 294 String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE); 295 zkNode = ZNodePaths.joinZNode(zkNode, PermissionStorage.NAMESPACE_PREFIX + namespace); 296 297 try { 298 ZKUtil.deleteNode(watcher, zkNode); 299 } catch (KeeperException.NoNodeException e) { 300 LOG.warn("No acl notify node of namespace '" + namespace + "'"); 301 } catch (KeeperException e) { 302 LOG.error("Failed deleting acl node of namespace '" + namespace + "'", e); 303 watcher.abort("Failed deleting node " + zkNode, e); 304 } 305 } 306}