001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.access;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.DaemonThreadFactory;
023import org.apache.hadoop.hbase.TableName;
024import org.apache.hadoop.hbase.util.Bytes;
025import org.apache.hadoop.hbase.zookeeper.ZKListener;
026import org.apache.hadoop.hbase.zookeeper.ZKUtil;
027import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
028import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.zookeeper.KeeperException;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import java.io.Closeable;
035import java.io.IOException;
036import java.util.List;
037import java.util.concurrent.Callable;
038import java.util.concurrent.CountDownLatch;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.ExecutorService;
041import java.util.concurrent.Executors;
042import java.util.concurrent.Future;
043import java.util.concurrent.RejectedExecutionException;
044
045/**
046 * Handles synchronization of access control list entries and updates
047 * throughout all nodes in the cluster.  The {@link AccessController} instance
048 * on the {@code _acl_} table regions, creates a znode for each table as
049 * {@code /hbase/acl/tablename}, with the znode data containing a serialized
050 * list of the permissions granted for the table.  The {@code AccessController}
051 * instances on all other cluster hosts watch the znodes for updates, which
052 * trigger updates in the {@link AuthManager} permission cache.
053 */
054@InterfaceAudience.Private
055public class ZKPermissionWatcher extends ZKListener implements Closeable {
056  private static final Logger LOG = LoggerFactory.getLogger(ZKPermissionWatcher.class);
057  // parent node for permissions lists
058  static final String ACL_NODE = "acl";
059  private final AuthManager authManager;
060  private final String aclZNode;
061  private final CountDownLatch initialized = new CountDownLatch(1);
062  private final ExecutorService executor;
063  private Future<?> childrenChangedFuture;
064
065  public ZKPermissionWatcher(ZKWatcher watcher,
066      AuthManager authManager, Configuration conf) {
067    super(watcher);
068    this.authManager = authManager;
069    String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
070    this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent);
071    executor = Executors.newSingleThreadExecutor(
072      new DaemonThreadFactory("zk-permission-watcher"));
073  }
074
075  public void start() throws KeeperException {
076    try {
077      watcher.registerListener(this);
078      if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) {
079        try {
080          executor.submit(new Callable<Void>() {
081            @Override
082            public Void call() throws KeeperException {
083              List<ZKUtil.NodeAndData> existing =
084                  ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
085              if (existing != null) {
086                refreshNodes(existing);
087              }
088              return null;
089            }
090          }).get();
091        } catch (ExecutionException ex) {
092          if (ex.getCause() instanceof KeeperException) {
093            throw (KeeperException)ex.getCause();
094          } else {
095            throw new RuntimeException(ex.getCause());
096          }
097        } catch (InterruptedException ex) {
098          Thread.currentThread().interrupt();
099        }
100      }
101    } finally {
102      initialized.countDown();
103    }
104  }
105
106  @Override
107  public void close() {
108    executor.shutdown();
109  }
110
111  private void waitUntilStarted() {
112    try {
113      initialized.await();
114    } catch (InterruptedException e) {
115      LOG.warn("Interrupted while waiting for start", e);
116      Thread.currentThread().interrupt();
117    }
118  }
119
120  @Override
121  public void nodeCreated(String path) {
122    waitUntilStarted();
123    if (path.equals(aclZNode)) {
124      asyncProcessNodeUpdate(new Runnable() {
125        @Override
126        public void run() {
127          try {
128            List<ZKUtil.NodeAndData> nodes =
129                ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
130            refreshNodes(nodes);
131          } catch (KeeperException ke) {
132            LOG.error("Error reading data from zookeeper", ke);
133            // only option is to abort
134            watcher.abort("ZooKeeper error obtaining acl node children", ke);
135          }
136        }
137      });
138    }
139  }
140
141  @Override
142  public void nodeDeleted(final String path) {
143    waitUntilStarted();
144    if (aclZNode.equals(ZKUtil.getParent(path))) {
145      asyncProcessNodeUpdate(new Runnable() {
146        @Override
147        public void run() {
148          String table = ZKUtil.getNodeName(path);
149          if (PermissionStorage.isNamespaceEntry(table)) {
150            authManager.removeNamespace(Bytes.toBytes(table));
151          } else {
152            authManager.removeTable(TableName.valueOf(table));
153          }
154        }
155      });
156    }
157  }
158
159  @Override
160  public void nodeDataChanged(final String path) {
161    waitUntilStarted();
162    if (aclZNode.equals(ZKUtil.getParent(path))) {
163      asyncProcessNodeUpdate(new Runnable() {
164        @Override
165        public void run() {
166          // update cache on an existing table node
167          String entry = ZKUtil.getNodeName(path);
168          try {
169            byte[] data = ZKUtil.getDataAndWatch(watcher, path);
170            refreshAuthManager(entry, data);
171          } catch (KeeperException ke) {
172            LOG.error("Error reading data from zookeeper for node " + entry, ke);
173            // only option is to abort
174            watcher.abort("ZooKeeper error getting data for node " + entry, ke);
175          } catch (IOException ioe) {
176            LOG.error("Error reading permissions writables", ioe);
177          }
178        }
179      });
180    }
181  }
182
183
184  @Override
185  public void nodeChildrenChanged(final String path) {
186    waitUntilStarted();
187    if (path.equals(aclZNode)) {
188      // preempt any existing nodeChildrenChanged event processing
189      if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) {
190        boolean cancelled = childrenChangedFuture.cancel(true);
191        if (!cancelled) {
192          // task may have finished between our check and attempted cancel, this is fine.
193          if (!childrenChangedFuture.isDone()) {
194            LOG.warn("Could not cancel processing node children changed event, "
195              + "please file a JIRA and attach logs if possible.");
196          }
197        }
198      }
199      childrenChangedFuture = asyncProcessNodeUpdate(() -> {
200        try {
201          final List<ZKUtil.NodeAndData> nodeList =
202            ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode, false);
203          refreshNodes(nodeList);
204        } catch (KeeperException ke) {
205          String msg = "ZooKeeper error while reading node children data for path " + path;
206          LOG.error(msg, ke);
207          watcher.abort(msg, ke);
208        }
209      });
210    }
211  }
212
213  private Future<?> asyncProcessNodeUpdate(Runnable runnable) {
214    if (!executor.isShutdown()) {
215      try {
216        return executor.submit(runnable);
217      } catch (RejectedExecutionException e) {
218        if (executor.isShutdown()) {
219          LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
220        } else {
221          throw e;
222        }
223      }
224    }
225    return null; // No task launched so there will be nothing to cancel later
226  }
227
228  private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
229    for (ZKUtil.NodeAndData n : nodes) {
230      if (Thread.interrupted()) {
231        // Use Thread.interrupted so that we clear interrupt status
232        break;
233      }
234      if (n.isEmpty()) continue;
235      String path = n.getNode();
236      String entry = (ZKUtil.getNodeName(path));
237      try {
238        refreshAuthManager(entry, n.getData());
239      } catch (IOException ioe) {
240        LOG.error("Failed parsing permissions for table '" + entry +
241            "' from zk", ioe);
242      }
243    }
244  }
245
246  private void refreshAuthManager(String entry, byte[] nodeData) throws IOException {
247    if (LOG.isDebugEnabled()) {
248      LOG.debug("Updating permissions cache from {} with data {}", entry,
249          Bytes.toStringBinary(nodeData));
250    }
251    if (PermissionStorage.isNamespaceEntry(entry)) {
252      authManager.refreshNamespaceCacheFromWritable(PermissionStorage.fromNamespaceEntry(entry),
253        nodeData);
254    } else {
255      authManager.refreshTableCacheFromWritable(TableName.valueOf(entry), nodeData);
256    }
257  }
258
259  /***
260   * Write a table's access controls to the permissions mirror in zookeeper
261   * @param entry
262   * @param permsData
263   */
264  public void writeToZookeeper(byte[] entry, byte[] permsData) {
265    String entryName = Bytes.toString(entry);
266    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
267    zkNode = ZNodePaths.joinZNode(zkNode, entryName);
268
269    try {
270      ZKUtil.createWithParents(watcher, zkNode);
271      ZKUtil.updateExistingNodeData(watcher, zkNode, permsData, -1);
272    } catch (KeeperException e) {
273      LOG.error("Failed updating permissions for entry '" +
274          entryName + "'", e);
275      watcher.abort("Failed writing node "+zkNode+" to zookeeper", e);
276    }
277  }
278
279  /***
280   * Delete the acl notify node of table
281   * @param tableName
282   */
283  public void deleteTableACLNode(final TableName tableName) {
284    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
285    zkNode = ZNodePaths.joinZNode(zkNode, tableName.getNameAsString());
286
287    try {
288      ZKUtil.deleteNode(watcher, zkNode);
289    } catch (KeeperException.NoNodeException e) {
290      LOG.warn("No acl notify node of table '" + tableName + "'");
291    } catch (KeeperException e) {
292      LOG.error("Failed deleting acl node of table '" + tableName + "'", e);
293      watcher.abort("Failed deleting node " + zkNode, e);
294    }
295  }
296
297  /***
298   * Delete the acl notify node of namespace
299   */
300  public void deleteNamespaceACLNode(final String namespace) {
301    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
302    zkNode = ZNodePaths.joinZNode(zkNode, PermissionStorage.NAMESPACE_PREFIX + namespace);
303
304    try {
305      ZKUtil.deleteNode(watcher, zkNode);
306    } catch (KeeperException.NoNodeException e) {
307      LOG.warn("No acl notify node of namespace '" + namespace + "'");
308    } catch (KeeperException e) {
309      LOG.error("Failed deleting acl node of namespace '" + namespace + "'", e);
310      watcher.abort("Failed deleting node " + zkNode, e);
311    }
312  }
313}