1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import com.google.common.collect.Sets;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
26 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
27 import org.apache.hadoop.hbase.util.Bytes;
28 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
29 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
30 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
31 import org.apache.zookeeper.KeeperException;
32
33 import java.io.IOException;
34 import java.util.List;
35 import java.util.NavigableMap;
36 import java.util.NavigableSet;
37 import java.util.concurrent.ConcurrentSkipListMap;
38
39
40
41
42
43
44
45
46
47
48
49 @InterfaceAudience.Private
50 public class ZKNamespaceManager extends ZooKeeperListener {
51 private static final Log LOG = LogFactory.getLog(ZKNamespaceManager.class);
52 private final String nsZNode;
53 private volatile NavigableMap<String,NamespaceDescriptor> cache;
54
55 public ZKNamespaceManager(ZooKeeperWatcher zkw) throws IOException {
56 super(zkw);
57 nsZNode = ZooKeeperWatcher.namespaceZNode;
58 cache = new ConcurrentSkipListMap<String, NamespaceDescriptor>();
59 }
60
61 public void start() throws IOException {
62 watcher.registerListener(this);
63 try {
64 if (ZKUtil.watchAndCheckExists(watcher, nsZNode)) {
65 List<ZKUtil.NodeAndData> existing =
66 ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
67 if (existing != null) {
68 refreshNodes(existing);
69 }
70 } else {
71 ZKUtil.createWithParents(watcher, nsZNode);
72 }
73 } catch (KeeperException e) {
74 throw new IOException("Failed to initialize ZKNamespaceManager", e);
75 }
76 }
77
78 public NamespaceDescriptor get(String name) {
79 return cache.get(name);
80 }
81
82 public void update(NamespaceDescriptor ns) throws IOException {
83 writeNamespace(ns);
84 cache.put(ns.getName(), ns);
85 }
86
87 public void remove(String name) throws IOException {
88 deleteNamespace(name);
89 cache.remove(name);
90 }
91
92 public NavigableSet<NamespaceDescriptor> list() throws IOException {
93 NavigableSet<NamespaceDescriptor> ret =
94 Sets.newTreeSet(NamespaceDescriptor.NAMESPACE_DESCRIPTOR_COMPARATOR);
95 for(NamespaceDescriptor ns: cache.values()) {
96 ret.add(ns);
97 }
98 return ret;
99 }
100
101 @Override
102 public void nodeCreated(String path) {
103 if (nsZNode.equals(path)) {
104 try {
105 List<ZKUtil.NodeAndData> nodes =
106 ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
107 refreshNodes(nodes);
108 } catch (KeeperException ke) {
109 String msg = "Error reading data from zookeeper";
110 LOG.error(msg, ke);
111 watcher.abort(msg, ke);
112 } catch (IOException e) {
113 String msg = "Error parsing data from zookeeper";
114 LOG.error(msg, e);
115 watcher.abort(msg, e);
116 }
117 }
118 }
119
120 @Override
121 public void nodeDeleted(String path) {
122 if (nsZNode.equals(ZKUtil.getParent(path))) {
123 String nsName = ZKUtil.getNodeName(path);
124 cache.remove(nsName);
125 }
126 }
127
128 @Override
129 public void nodeDataChanged(String path) {
130 if (nsZNode.equals(ZKUtil.getParent(path))) {
131 try {
132 byte[] data = ZKUtil.getDataAndWatch(watcher, path);
133 NamespaceDescriptor ns =
134 ProtobufUtil.toNamespaceDescriptor(
135 HBaseProtos.NamespaceDescriptor.parseFrom(data));
136 cache.put(ns.getName(), ns);
137 } catch (KeeperException ke) {
138 String msg = "Error reading data from zookeeper for node "+path;
139 LOG.error(msg, ke);
140
141 watcher.abort(msg, ke);
142 } catch (IOException ioe) {
143 String msg = "Error deserializing namespace: "+path;
144 LOG.error(msg, ioe);
145 watcher.abort(msg, ioe);
146 }
147 }
148 }
149
150 @Override
151 public void nodeChildrenChanged(String path) {
152 if (nsZNode.equals(path)) {
153 try {
154 List<ZKUtil.NodeAndData> nodes =
155 ZKUtil.getChildDataAndWatchForNewChildren(watcher, nsZNode);
156 refreshNodes(nodes);
157 } catch (KeeperException ke) {
158 LOG.error("Error reading data from zookeeper for path "+path, ke);
159 watcher.abort("Zookeeper error get node children for path "+path, ke);
160 } catch (IOException e) {
161 LOG.error("Error deserializing namespace child from: "+path, e);
162 watcher.abort("Error deserializing namespace child from: " + path, e);
163 }
164 }
165 }
166
167 private void deleteNamespace(String name) throws IOException {
168 String zNode = ZKUtil.joinZNode(nsZNode, name);
169 try {
170 ZKUtil.deleteNode(watcher, zNode);
171 } catch (KeeperException e) {
172 LOG.error("Failed updating permissions for namespace "+name, e);
173 throw new IOException("Failed updating permissions for namespace "+name, e);
174 }
175 }
176
177 private void writeNamespace(NamespaceDescriptor ns) throws IOException {
178 String zNode = ZKUtil.joinZNode(nsZNode, ns.getName());
179 try {
180 ZKUtil.createWithParents(watcher, zNode);
181 ZKUtil.updateExistingNodeData(watcher, zNode,
182 ProtobufUtil.toProtoNamespaceDescriptor(ns).toByteArray(), -1);
183 } catch (KeeperException e) {
184 LOG.error("Failed updating permissions for namespace "+ns.getName(), e);
185 throw new IOException("Failed updating permissions for namespace "+ns.getName(), e);
186 }
187 }
188
189 private void refreshNodes(List<ZKUtil.NodeAndData> nodes) throws IOException {
190 for (ZKUtil.NodeAndData n : nodes) {
191 if (n.isEmpty()) continue;
192 String path = n.getNode();
193 String namespace = ZKUtil.getNodeName(path);
194 byte[] nodeData = n.getData();
195 if (LOG.isDebugEnabled()) {
196 LOG.debug("Updating namespace cache from node "+namespace+" with data: "+
197 Bytes.toStringBinary(nodeData));
198 }
199 NamespaceDescriptor ns =
200 ProtobufUtil.toNamespaceDescriptor(
201 HBaseProtos.NamespaceDescriptor.parseFrom(nodeData));
202 cache.put(ns.getName(), ns);
203 }
204 }
205 }