001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.NavigableMap;
024import java.util.NavigableSet;
025import java.util.concurrent.ConcurrentSkipListMap;
026
027import org.apache.hadoop.hbase.util.Bytes;
028import org.apache.hadoop.hbase.zookeeper.ZKListener;
029import org.apache.hadoop.hbase.zookeeper.ZKUtil;
030import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
031import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.apache.zookeeper.KeeperException;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
040
041/**
042 * Class servers two purposes:
043 *
044 * 1. Broadcast NamespaceDescriptor information via ZK
045 * (Done by the Master)
046 * 2. Consume broadcasted NamespaceDescriptor changes
047 * (Done by the RegionServers)
048 *
049 */
050@InterfaceAudience.Private
051public class ZKNamespaceManager extends ZKListener {
052  private static final Logger LOG = LoggerFactory.getLogger(ZKNamespaceManager.class);
053  private final String nsZNode;
054  private final NavigableMap<String,NamespaceDescriptor> cache;
055
056  public ZKNamespaceManager(ZKWatcher zkw) throws IOException {
057    super(zkw);
058    nsZNode = zkw.getZNodePaths().namespaceZNode;
059    cache = new ConcurrentSkipListMap<>();
060  }
061
062  public void start() throws IOException {
063    watcher.registerListener(this);
064    try {
065      if (ZKUtil.watchAndCheckExists(watcher, nsZNode)) {
066        List<ZKUtil.NodeAndData> existing =
067            ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
068        if (existing != null) {
069          refreshNodes(existing);
070        }
071      } else {
072        ZKUtil.createWithParents(watcher, nsZNode);
073      }
074    } catch (KeeperException e) {
075      throw new IOException("Failed to initialize ZKNamespaceManager", e);
076    }
077  }
078
079  public void stop() throws IOException {
080    this.watcher.unregisterListener(this);
081  }
082
083  public NamespaceDescriptor get(String name) {
084    return cache.get(name);
085  }
086
087  public void update(NamespaceDescriptor ns) throws IOException {
088    writeNamespace(ns);
089    cache.put(ns.getName(), ns);
090  }
091
092  public void remove(String name) throws IOException {
093    deleteNamespace(name);
094    cache.remove(name);
095  }
096
097  public NavigableSet<NamespaceDescriptor> list() throws IOException {
098    NavigableSet<NamespaceDescriptor> ret =
099        Sets.newTreeSet(NamespaceDescriptor.NAMESPACE_DESCRIPTOR_COMPARATOR);
100    for(NamespaceDescriptor ns: cache.values()) {
101      ret.add(ns);
102    }
103    return ret;
104  }
105
106  @Override
107  public void nodeCreated(String path) {
108    if (nsZNode.equals(path)) {
109      try {
110        List<ZKUtil.NodeAndData> nodes =
111            ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
112        refreshNodes(nodes);
113      } catch (KeeperException ke) {
114        String msg = "Error reading data from zookeeper";
115        LOG.error(msg, ke);
116        watcher.abort(msg, ke);
117      } catch (IOException e) {
118        String msg = "Error parsing data from zookeeper";
119        LOG.error(msg, e);
120        watcher.abort(msg, e);
121      }
122    }
123  }
124
125  @Override
126  public void nodeDeleted(String path) {
127    if (nsZNode.equals(ZKUtil.getParent(path))) {
128      String nsName = ZKUtil.getNodeName(path);
129      cache.remove(nsName);
130    }
131  }
132
133  @Override
134  public void nodeDataChanged(String path) {
135    if (nsZNode.equals(ZKUtil.getParent(path))) {
136      try {
137        byte[] data = ZKUtil.getDataAndWatch(watcher, path);
138        NamespaceDescriptor ns =
139            ProtobufUtil.toNamespaceDescriptor(
140                HBaseProtos.NamespaceDescriptor.parseFrom(data));
141        cache.put(ns.getName(), ns);
142      } catch (KeeperException ke) {
143        String msg = "Error reading data from zookeeper for node "+path;
144        LOG.error(msg, ke);
145        // only option is to abort
146        watcher.abort(msg, ke);
147      } catch (IOException ioe) {
148        String msg = "Error deserializing namespace: "+path;
149        LOG.error(msg, ioe);
150        watcher.abort(msg, ioe);
151      }
152    }
153  }
154
155  @Override
156  public void nodeChildrenChanged(String path) {
157    if (nsZNode.equals(path)) {
158      try {
159        List<ZKUtil.NodeAndData> nodes =
160            ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
161        refreshNodes(nodes);
162      } catch (KeeperException ke) {
163        LOG.error("Error reading data from zookeeper for path "+path, ke);
164        watcher.abort("ZooKeeper error get node children for path "+path, ke);
165      } catch (IOException e) {
166        LOG.error("Error deserializing namespace child from: "+path, e);
167        watcher.abort("Error deserializing namespace child from: " + path, e);
168      }
169    }
170  }
171
172  private void deleteNamespace(String name) throws IOException {
173    String zNode = ZNodePaths.joinZNode(nsZNode, name);
174    try {
175      ZKUtil.deleteNode(watcher, zNode);
176    } catch (KeeperException e) {
177      if (e instanceof KeeperException.NoNodeException) {
178        // If the node does not exist, it could be already deleted. Continue without fail.
179        LOG.warn("The ZNode " + zNode + " for namespace " + name + " does not exist.");
180      } else {
181        LOG.error("Failed updating permissions for namespace " + name, e);
182        throw new IOException("Failed updating permissions for namespace " + name, e);
183      }
184    }
185  }
186
187  private void writeNamespace(NamespaceDescriptor ns) throws IOException {
188    String zNode = ZNodePaths.joinZNode(nsZNode, ns.getName());
189    try {
190      ZKUtil.createWithParents(watcher, zNode);
191      ZKUtil.updateExistingNodeData(watcher, zNode,
192          ProtobufUtil.toProtoNamespaceDescriptor(ns).toByteArray(), -1);
193    } catch (KeeperException e) {
194      LOG.error("Failed updating permissions for namespace "+ns.getName(), e);
195      throw new IOException("Failed updating permissions for namespace "+ns.getName(), e);
196    }
197  }
198
199  private void refreshNodes(List<ZKUtil.NodeAndData> nodes) throws IOException {
200    for (ZKUtil.NodeAndData n : nodes) {
201      if (n.isEmpty()) continue;
202      String path = n.getNode();
203      String namespace = ZKUtil.getNodeName(path);
204      byte[] nodeData = n.getData();
205      if (LOG.isTraceEnabled()) {
206        LOG.trace("Updating namespace cache from node " + namespace + " with data: " +
207            Bytes.toStringBinary(nodeData));
208      }
209      NamespaceDescriptor ns =
210          ProtobufUtil.toNamespaceDescriptor(
211              HBaseProtos.NamespaceDescriptor.parseFrom(nodeData));
212      cache.put(ns.getName(), ns);
213    }
214  }
215}