001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.access;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.Callable;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.concurrent.RejectedExecutionException;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.Threads;
034import org.apache.hadoop.hbase.zookeeper.ZKListener;
035import org.apache.hadoop.hbase.zookeeper.ZKUtil;
036import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
037import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.zookeeper.KeeperException;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
044
045/**
046 * Handles synchronization of access control list entries and updates throughout all nodes in the
047 * cluster. The {@link AccessController} instance on the {@code _acl_} table regions, creates a
048 * znode for each table as {@code /hbase/acl/tablename}, with the znode data containing a serialized
049 * list of the permissions granted for the table. The {@code AccessController} instances on all
050 * other cluster hosts watch the znodes for updates, which trigger updates in the
051 * {@link AuthManager} permission cache.
052 */
053@InterfaceAudience.Private
054public class ZKPermissionWatcher extends ZKListener implements Closeable {
055  private static final Logger LOG = LoggerFactory.getLogger(ZKPermissionWatcher.class);
056  // parent node for permissions lists
057  static final String ACL_NODE = "acl";
058  private final AuthManager authManager;
059  private final String aclZNode;
060  private final CountDownLatch initialized = new CountDownLatch(1);
061  private final ExecutorService executor;
062  private Future<?> childrenChangedFuture;
063
064  public ZKPermissionWatcher(ZKWatcher watcher, AuthManager authManager, Configuration conf) {
065    super(watcher);
066    this.authManager = authManager;
067    String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
068    this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent);
069    executor = Executors.newSingleThreadExecutor(
070      new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d").setDaemon(true)
071        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
072  }
073
074  public void start() throws KeeperException {
075    try {
076      watcher.registerListener(this);
077      if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) {
078        try {
079          executor.submit(new Callable<Void>() {
080            @Override
081            public Void call() throws KeeperException {
082              List<ZKUtil.NodeAndData> existing =
083                ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
084              if (existing != null) {
085                refreshNodes(existing);
086              }
087              return null;
088            }
089          }).get();
090        } catch (ExecutionException ex) {
091          if (ex.getCause() instanceof KeeperException) {
092            throw (KeeperException) ex.getCause();
093          } else {
094            throw new RuntimeException(ex.getCause());
095          }
096        } catch (InterruptedException ex) {
097          Thread.currentThread().interrupt();
098        }
099      }
100    } finally {
101      initialized.countDown();
102    }
103  }
104
105  @Override
106  public void close() {
107    executor.shutdown();
108  }
109
110  private void waitUntilStarted() {
111    try {
112      initialized.await();
113    } catch (InterruptedException e) {
114      LOG.warn("Interrupted while waiting for start", e);
115      Thread.currentThread().interrupt();
116    }
117  }
118
119  @Override
120  public void nodeCreated(String path) {
121    waitUntilStarted();
122    if (path.equals(aclZNode)) {
123      asyncProcessNodeUpdate(new Runnable() {
124        @Override
125        public void run() {
126          try {
127            List<ZKUtil.NodeAndData> nodes =
128              ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
129            refreshNodes(nodes);
130          } catch (KeeperException ke) {
131            LOG.error("Error reading data from zookeeper", ke);
132            // only option is to abort
133            watcher.abort("ZooKeeper error obtaining acl node children", ke);
134          }
135        }
136      });
137    }
138  }
139
140  @Override
141  public void nodeDeleted(final String path) {
142    waitUntilStarted();
143    if (aclZNode.equals(ZKUtil.getParent(path))) {
144      asyncProcessNodeUpdate(new Runnable() {
145        @Override
146        public void run() {
147          String table = ZKUtil.getNodeName(path);
148          if (PermissionStorage.isNamespaceEntry(table)) {
149            authManager.removeNamespace(Bytes.toBytes(table));
150          } else {
151            authManager.removeTable(TableName.valueOf(table));
152          }
153        }
154      });
155    }
156  }
157
158  @Override
159  public void nodeDataChanged(final String path) {
160    waitUntilStarted();
161    if (aclZNode.equals(ZKUtil.getParent(path))) {
162      asyncProcessNodeUpdate(new Runnable() {
163        @Override
164        public void run() {
165          // update cache on an existing table node
166          String entry = ZKUtil.getNodeName(path);
167          try {
168            byte[] data = ZKUtil.getDataAndWatch(watcher, path);
169            refreshAuthManager(entry, data);
170          } catch (KeeperException ke) {
171            LOG.error("Error reading data from zookeeper for node " + entry, ke);
172            // only option is to abort
173            watcher.abort("ZooKeeper error getting data for node " + entry, ke);
174          } catch (IOException ioe) {
175            LOG.error("Error reading permissions writables", ioe);
176          }
177        }
178      });
179    }
180  }
181
182  @Override
183  public void nodeChildrenChanged(final String path) {
184    waitUntilStarted();
185    if (path.equals(aclZNode)) {
186      // preempt any existing nodeChildrenChanged event processing
187      if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) {
188        boolean cancelled = childrenChangedFuture.cancel(true);
189        if (!cancelled) {
190          // task may have finished between our check and attempted cancel, this is fine.
191          if (!childrenChangedFuture.isDone()) {
192            LOG.warn("Could not cancel processing node children changed event, "
193              + "please file a JIRA and attach logs if possible.");
194          }
195        }
196      }
197      childrenChangedFuture = asyncProcessNodeUpdate(() -> {
198        try {
199          final List<ZKUtil.NodeAndData> nodeList =
200            ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode, false);
201          refreshNodes(nodeList);
202        } catch (KeeperException ke) {
203          String msg = "ZooKeeper error while reading node children data for path " + path;
204          LOG.error(msg, ke);
205          watcher.abort(msg, ke);
206        }
207      });
208    }
209  }
210
211  private Future<?> asyncProcessNodeUpdate(Runnable runnable) {
212    if (!executor.isShutdown()) {
213      try {
214        return executor.submit(runnable);
215      } catch (RejectedExecutionException e) {
216        if (executor.isShutdown()) {
217          LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
218        } else {
219          throw e;
220        }
221      }
222    }
223    return null; // No task launched so there will be nothing to cancel later
224  }
225
226  private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
227    for (ZKUtil.NodeAndData n : nodes) {
228      if (Thread.interrupted()) {
229        // Use Thread.interrupted so that we clear interrupt status
230        break;
231      }
232      if (n.isEmpty()) continue;
233      String path = n.getNode();
234      String entry = (ZKUtil.getNodeName(path));
235      try {
236        refreshAuthManager(entry, n.getData());
237      } catch (IOException ioe) {
238        LOG.error("Failed parsing permissions for table '" + entry + "' from zk", ioe);
239      }
240    }
241  }
242
243  private void refreshAuthManager(String entry, byte[] nodeData) throws IOException {
244    if (LOG.isDebugEnabled()) {
245      LOG.debug("Updating permissions cache from {} with data {}", entry,
246        Bytes.toStringBinary(nodeData));
247    }
248    if (PermissionStorage.isNamespaceEntry(entry)) {
249      authManager.refreshNamespaceCacheFromWritable(PermissionStorage.fromNamespaceEntry(entry),
250        nodeData);
251    } else {
252      authManager.refreshTableCacheFromWritable(TableName.valueOf(entry), nodeData);
253    }
254  }
255
256  /***
257   * Write a table's access controls to the permissions mirror in zookeeper nn
258   */
259  public void writeToZookeeper(byte[] entry, byte[] permsData) {
260    String entryName = Bytes.toString(entry);
261    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
262    zkNode = ZNodePaths.joinZNode(zkNode, entryName);
263
264    try {
265      ZKUtil.createWithParents(watcher, zkNode);
266      ZKUtil.updateExistingNodeData(watcher, zkNode, permsData, -1);
267    } catch (KeeperException e) {
268      LOG.error("Failed updating permissions for entry '" + entryName + "'", e);
269      watcher.abort("Failed writing node " + zkNode + " to zookeeper", e);
270    }
271  }
272
273  /***
274   * Delete the acl notify node of table n
275   */
276  public void deleteTableACLNode(final TableName tableName) {
277    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
278    zkNode = ZNodePaths.joinZNode(zkNode, tableName.getNameAsString());
279
280    try {
281      ZKUtil.deleteNode(watcher, zkNode);
282    } catch (KeeperException.NoNodeException e) {
283      LOG.warn("No acl notify node of table '" + tableName + "'");
284    } catch (KeeperException e) {
285      LOG.error("Failed deleting acl node of table '" + tableName + "'", e);
286      watcher.abort("Failed deleting node " + zkNode, e);
287    }
288  }
289
290  /***
291   * Delete the acl notify node of namespace
292   */
293  public void deleteNamespaceACLNode(final String namespace) {
294    String zkNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, ACL_NODE);
295    zkNode = ZNodePaths.joinZNode(zkNode, PermissionStorage.NAMESPACE_PREFIX + namespace);
296
297    try {
298      ZKUtil.deleteNode(watcher, zkNode);
299    } catch (KeeperException.NoNodeException e) {
300      LOG.warn("No acl notify node of namespace '" + namespace + "'");
301    } catch (KeeperException e) {
302      LOG.error("Failed deleting acl node of namespace '" + namespace + "'", e);
303      watcher.abort("Failed deleting node " + zkNode, e);
304    }
305  }
306}