1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.Arrays;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.CoordinatedStateException;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.TableStateManager;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.exceptions.DeserializationException;
37 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
39 import org.apache.zookeeper.KeeperException;
40
41
42
43
44
45
46
47
48
49
50 @InterfaceAudience.Private
51 public class ZKTableStateManager implements TableStateManager {
52
53
54
55
56
57 private static final Log LOG = LogFactory.getLog(ZKTableStateManager.class);
58 private final ZooKeeperWatcher watcher;
59
60
61
62
63
64
65 private final Map<TableName, ZooKeeperProtos.Table.State> cache =
66 new HashMap<TableName, ZooKeeperProtos.Table.State>();
67
68 public ZKTableStateManager(final ZooKeeperWatcher zkw) throws KeeperException,
69 InterruptedException {
70 super();
71 this.watcher = zkw;
72 populateTableStates();
73 }
74
75
76
77
78
79 private void populateTableStates() throws KeeperException, InterruptedException {
80 synchronized (this.cache) {
81 List<String> children = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.tableZNode);
82 if (children == null) return;
83 for (String child: children) {
84 TableName tableName = TableName.valueOf(child);
85 ZooKeeperProtos.Table.State state = getTableState(this.watcher, tableName);
86 if (state != null) this.cache.put(tableName, state);
87 }
88 }
89 }
90
91
92
93
94
95
96 @Override
97 public void setTableState(TableName tableName, ZooKeeperProtos.Table.State state)
98 throws CoordinatedStateException {
99 synchronized (this.cache) {
100 LOG.info("Moving table " + tableName + " state from " + this.cache.get(tableName)
101 + " to " + state);
102 try {
103 setTableStateInZK(tableName, state);
104 } catch (KeeperException e) {
105 throw new CoordinatedStateException(e);
106 }
107 }
108 }
109
110
111
112
113
114 @Override
115 public boolean setTableStateIfInStates(TableName tableName,
116 ZooKeeperProtos.Table.State newState,
117 ZooKeeperProtos.Table.State... states)
118 throws CoordinatedStateException {
119 synchronized (this.cache) {
120
121
122 if (
123 (newState == ZooKeeperProtos.Table.State.DISABLING) &&
124 this.cache.get(tableName) != null && !isTableState(tableName, states) ||
125 (newState != ZooKeeperProtos.Table.State.DISABLING &&
126 !isTableState(tableName, states) )) {
127 return false;
128 }
129 try {
130 setTableStateInZK(tableName, newState);
131 } catch (KeeperException e) {
132 throw new CoordinatedStateException(e);
133 }
134 return true;
135 }
136 }
137
138
139
140
141
142 @Override
143 public boolean setTableStateIfNotInStates(TableName tableName,
144 ZooKeeperProtos.Table.State newState,
145 ZooKeeperProtos.Table.State... states)
146 throws CoordinatedStateException {
147 synchronized (this.cache) {
148 if (isTableState(tableName, states)) {
149
150
151
152
153
154
155 try {
156 ZooKeeperProtos.Table.State curstate = getTableState(watcher, tableName);
157
158 if (isTableInState(Arrays.asList(states), curstate)) {
159 return false;
160 }
161 } catch (KeeperException e) {
162 throw new CoordinatedStateException(e);
163 } catch (InterruptedException e) {
164 throw new CoordinatedStateException(e);
165 }
166 }
167 try {
168 setTableStateInZK(tableName, newState);
169 } catch (KeeperException e) {
170 throw new CoordinatedStateException(e);
171 }
172 return true;
173 }
174 }
175
176 private void setTableStateInZK(final TableName tableName,
177 final ZooKeeperProtos.Table.State state)
178 throws KeeperException {
179 String znode = ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString());
180 if (ZKUtil.checkExists(this.watcher, znode) == -1) {
181 ZKUtil.createAndFailSilent(this.watcher, znode);
182 }
183 synchronized (this.cache) {
184 ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
185 builder.setState(state);
186 byte [] data = ProtobufUtil.prependPBMagic(builder.build().toByteArray());
187 ZKUtil.setData(this.watcher, znode, data);
188 this.cache.put(tableName, state);
189 }
190 }
191
192
193
194
195 @Override
196 public boolean isTableState(final TableName tableName,
197 final ZooKeeperProtos.Table.State... states) {
198 return isTableState(tableName, false, states);
199 }
200
201
202
203
204 @Override
205 public boolean isTableState(final TableName tableName, final boolean checkSource,
206 final ZooKeeperProtos.Table.State... states) {
207 boolean isTableInSpecifiedState;
208 synchronized (this.cache) {
209 ZooKeeperProtos.Table.State currentState = this.cache.get(tableName);
210 if (checkSource) {
211
212
213 try {
214 ZooKeeperProtos.Table.State stateInZK = getTableState(watcher, tableName);
215
216 if (currentState != stateInZK) {
217 if (stateInZK != null) {
218 this.cache.put(tableName, stateInZK);
219 } else {
220 this.cache.remove(tableName);
221 }
222 currentState = stateInZK;
223 }
224 } catch (KeeperException | InterruptedException e) {
225
226 }
227 }
228 return isTableInState(Arrays.asList(states), currentState);
229 }
230 }
231
232
233
234
235
236 @Override
237 public void setDeletedTable(final TableName tableName)
238 throws CoordinatedStateException {
239 synchronized (this.cache) {
240 if (this.cache.remove(tableName) == null) {
241 LOG.warn("Moving table " + tableName + " state to deleted but was already deleted");
242 }
243 try {
244 ZKUtil.deleteNodeFailSilent(this.watcher,
245 ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()));
246 } catch (KeeperException e) {
247 throw new CoordinatedStateException(e);
248 }
249 }
250 }
251
252
253
254
255
256
257
258 @Override
259 public boolean isTablePresent(final TableName tableName) {
260 synchronized (this.cache) {
261 ZooKeeperProtos.Table.State state = this.cache.get(tableName);
262 return !(state == null);
263 }
264 }
265
266
267
268
269
270
271 @Override
272 public Set<TableName> getTablesInStates(ZooKeeperProtos.Table.State... states)
273 throws InterruptedIOException, CoordinatedStateException {
274 try {
275 return getAllTables(states);
276 } catch (KeeperException e) {
277 throw new CoordinatedStateException(e);
278 }
279 }
280
281
282
283
284 @Override
285 public void checkAndRemoveTableState(TableName tableName, ZooKeeperProtos.Table.State states,
286 boolean deletePermanentState)
287 throws CoordinatedStateException {
288 synchronized (this.cache) {
289 if (isTableState(tableName, states)) {
290 this.cache.remove(tableName);
291 if (deletePermanentState) {
292 try {
293 ZKUtil.deleteNodeFailSilent(this.watcher,
294 ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()));
295 } catch (KeeperException e) {
296 throw new CoordinatedStateException(e);
297 }
298 }
299 }
300 }
301 }
302
303
304
305
306
307
308 Set<TableName> getAllTables(final ZooKeeperProtos.Table.State... states)
309 throws KeeperException, InterruptedIOException {
310
311 Set<TableName> allTables = new HashSet<TableName>();
312 List<String> children =
313 ZKUtil.listChildrenNoWatch(watcher, watcher.tableZNode);
314 if(children == null) return allTables;
315 for (String child: children) {
316 TableName tableName = TableName.valueOf(child);
317 ZooKeeperProtos.Table.State state;
318 try {
319 state = getTableState(watcher, tableName);
320 } catch (InterruptedException e) {
321 throw new InterruptedIOException();
322 }
323 for (ZooKeeperProtos.Table.State expectedState: states) {
324 if (state == expectedState) {
325 allTables.add(tableName);
326 break;
327 }
328 }
329 }
330 return allTables;
331 }
332
333
334
335
336
337
338
339
340 private ZooKeeperProtos.Table.State getTableState(final ZooKeeperWatcher zkw,
341 final TableName tableName)
342 throws KeeperException, InterruptedException {
343 String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName.getNameAsString());
344 byte [] data = ZKUtil.getData(zkw, znode);
345 if (data == null || data.length <= 0) return null;
346 try {
347 ProtobufUtil.expectPBMagicPrefix(data);
348 ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
349 int magicLen = ProtobufUtil.lengthOfPBMagic();
350 ProtobufUtil.mergeFrom(builder, data, magicLen, data.length - magicLen);
351 return builder.getState();
352 } catch (IOException e) {
353 KeeperException ke = new KeeperException.DataInconsistencyException();
354 ke.initCause(e);
355 throw ke;
356 } catch (DeserializationException e) {
357 throw ZKUtil.convert(e);
358 }
359 }
360
361
362
363
364
365 private boolean isTableInState(final List<ZooKeeperProtos.Table.State> expectedStates,
366 final ZooKeeperProtos.Table.State currentState) {
367 return currentState != null && expectedStates.contains(currentState);
368 }
369 }