001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.NavigableMap;
023import java.util.NavigableSet;
024import java.util.concurrent.ConcurrentSkipListMap;
025import org.apache.hadoop.hbase.util.Bytes;
026import org.apache.hadoop.hbase.zookeeper.ZKListener;
027import org.apache.hadoop.hbase.zookeeper.ZKUtil;
028import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
029import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.apache.zookeeper.KeeperException;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
036
037import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
039
040/**
041 * Class servers two purposes: 1. Broadcast NamespaceDescriptor information via ZK (Done by the
042 * Master) 2. Consume broadcasted NamespaceDescriptor changes (Done by the RegionServers)
043 */
044@InterfaceAudience.Private
045public class ZKNamespaceManager extends ZKListener {
046  private static final Logger LOG = LoggerFactory.getLogger(ZKNamespaceManager.class);
047  private final String nsZNode;
048  private final NavigableMap<String, NamespaceDescriptor> cache;
049
050  public ZKNamespaceManager(ZKWatcher zkw) throws IOException {
051    super(zkw);
052    nsZNode = zkw.getZNodePaths().namespaceZNode;
053    cache = new ConcurrentSkipListMap<>();
054  }
055
056  public void start() throws IOException {
057    watcher.registerListener(this);
058    try {
059      if (ZKUtil.watchAndCheckExists(watcher, nsZNode)) {
060        List<ZKUtil.NodeAndData> existing =
061          ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
062        if (existing != null) {
063          refreshNodes(existing);
064        }
065      } else {
066        ZKUtil.createWithParents(watcher, nsZNode);
067      }
068    } catch (KeeperException e) {
069      throw new IOException("Failed to initialize ZKNamespaceManager", e);
070    }
071  }
072
073  public void stop() throws IOException {
074    this.watcher.unregisterListener(this);
075  }
076
077  public NamespaceDescriptor get(String name) {
078    return cache.get(name);
079  }
080
081  public void update(NamespaceDescriptor ns) throws IOException {
082    writeNamespace(ns);
083    cache.put(ns.getName(), ns);
084  }
085
086  public void remove(String name) throws IOException {
087    deleteNamespace(name);
088    cache.remove(name);
089  }
090
091  public NavigableSet<NamespaceDescriptor> list() throws IOException {
092    NavigableSet<NamespaceDescriptor> ret =
093      Sets.newTreeSet(NamespaceDescriptor.NAMESPACE_DESCRIPTOR_COMPARATOR);
094    for (NamespaceDescriptor ns : cache.values()) {
095      ret.add(ns);
096    }
097    return ret;
098  }
099
100  @Override
101  public void nodeCreated(String path) {
102    if (nsZNode.equals(path)) {
103      try {
104        List<ZKUtil.NodeAndData> nodes =
105          ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
106        refreshNodes(nodes);
107      } catch (KeeperException ke) {
108        String msg = "Error reading data from zookeeper";
109        LOG.error(msg, ke);
110        watcher.abort(msg, ke);
111      } catch (IOException e) {
112        String msg = "Error parsing data from zookeeper";
113        LOG.error(msg, e);
114        watcher.abort(msg, e);
115      }
116    }
117  }
118
119  @Override
120  public void nodeDeleted(String path) {
121    if (nsZNode.equals(ZKUtil.getParent(path))) {
122      String nsName = ZKUtil.getNodeName(path);
123      cache.remove(nsName);
124    }
125  }
126
127  @Override
128  public void nodeDataChanged(String path) {
129    if (nsZNode.equals(ZKUtil.getParent(path))) {
130      try {
131        byte[] data = ZKUtil.getDataAndWatch(watcher, path);
132        NamespaceDescriptor ns =
133          ProtobufUtil.toNamespaceDescriptor(HBaseProtos.NamespaceDescriptor.parseFrom(data));
134        cache.put(ns.getName(), ns);
135      } catch (KeeperException ke) {
136        String msg = "Error reading data from zookeeper for node " + path;
137        LOG.error(msg, ke);
138        // only option is to abort
139        watcher.abort(msg, ke);
140      } catch (IOException ioe) {
141        String msg = "Error deserializing namespace: " + path;
142        LOG.error(msg, ioe);
143        watcher.abort(msg, ioe);
144      }
145    }
146  }
147
148  @Override
149  public void nodeChildrenChanged(String path) {
150    if (nsZNode.equals(path)) {
151      try {
152        List<ZKUtil.NodeAndData> nodes =
153          ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
154        refreshNodes(nodes);
155      } catch (KeeperException ke) {
156        LOG.error("Error reading data from zookeeper for path " + path, ke);
157        watcher.abort("ZooKeeper error get node children for path " + path, ke);
158      } catch (IOException e) {
159        LOG.error("Error deserializing namespace child from: " + path, e);
160        watcher.abort("Error deserializing namespace child from: " + path, e);
161      }
162    }
163  }
164
165  private void deleteNamespace(String name) throws IOException {
166    String zNode = ZNodePaths.joinZNode(nsZNode, name);
167    try {
168      ZKUtil.deleteNode(watcher, zNode);
169    } catch (KeeperException e) {
170      if (e instanceof KeeperException.NoNodeException) {
171        // If the node does not exist, it could be already deleted. Continue without fail.
172        LOG.warn("The ZNode " + zNode + " for namespace " + name + " does not exist.");
173      } else {
174        LOG.error("Failed updating permissions for namespace " + name, e);
175        throw new IOException("Failed updating permissions for namespace " + name, e);
176      }
177    }
178  }
179
180  private void writeNamespace(NamespaceDescriptor ns) throws IOException {
181    String zNode = ZNodePaths.joinZNode(nsZNode, ns.getName());
182    try {
183      ZKUtil.createWithParents(watcher, zNode);
184      ZKUtil.updateExistingNodeData(watcher, zNode,
185        ProtobufUtil.toProtoNamespaceDescriptor(ns).toByteArray(), -1);
186    } catch (KeeperException e) {
187      LOG.error("Failed updating permissions for namespace " + ns.getName(), e);
188      throw new IOException("Failed updating permissions for namespace " + ns.getName(), e);
189    }
190  }
191
192  private void refreshNodes(List<ZKUtil.NodeAndData> nodes) throws IOException {
193    for (ZKUtil.NodeAndData n : nodes) {
194      if (n.isEmpty()) continue;
195      String path = n.getNode();
196      String namespace = ZKUtil.getNodeName(path);
197      byte[] nodeData = n.getData();
198      if (LOG.isTraceEnabled()) {
199        LOG.trace("Updating namespace cache from node " + namespace + " with data: "
200          + Bytes.toStringBinary(nodeData));
201      }
202      NamespaceDescriptor ns =
203        ProtobufUtil.toNamespaceDescriptor(HBaseProtos.NamespaceDescriptor.parseFrom(nodeData));
204      cache.put(ns.getName(), ns);
205    }
206  }
207}