View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.zookeeper;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.Arrays;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.CoordinatedStateException;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.TableStateManager;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.exceptions.DeserializationException;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
39  import org.apache.zookeeper.KeeperException;
40  
41  /**
42   * Implementation of TableStateManager which reads, caches and sets state
43   * up in ZooKeeper.  If multiple read/write clients, will make for confusion.
44   * Code running on client side without consensus context should use
45   * {@link ZKTableStateClientSideReader} instead.
46   *
47   * <p>To save on trips to the zookeeper ensemble, internally we cache table
48   * state.
49   */
50  @InterfaceAudience.Private
51  public class ZKTableStateManager implements TableStateManager {
52    // A znode will exist under the table directory if it is in any of the
53    // following states: {@link TableState#ENABLING} , {@link TableState#DISABLING},
54    // or {@link TableState#DISABLED}.  If {@link TableState#ENABLED}, there will
55    // be no entry for a table in zk.  Thats how it currently works.
56  
57    private static final Log LOG = LogFactory.getLog(ZKTableStateManager.class);
58    private final ZooKeeperWatcher watcher;
59  
60    /**
61     * Cache of what we found in zookeeper so we don't have to go to zk ensemble
62     * for every query.  Synchronize access rather than use concurrent Map because
63     * synchronization needs to span query of zk.
64     */
65    private final Map<TableName, ZooKeeperProtos.Table.State> cache =
66      new HashMap<TableName, ZooKeeperProtos.Table.State>();
67  
68    public ZKTableStateManager(final ZooKeeperWatcher zkw) throws KeeperException,
69        InterruptedException {
70      super();
71      this.watcher = zkw;
72      populateTableStates();
73    }
74  
75    /**
76     * Gets a list of all the tables set as disabled in zookeeper.
77     * @throws KeeperException, InterruptedException
78     */
79    private void populateTableStates() throws KeeperException, InterruptedException {
80      synchronized (this.cache) {
81        List<String> children = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.tableZNode);
82        if (children == null) return;
83        for (String child: children) {
84          TableName tableName = TableName.valueOf(child);
85          ZooKeeperProtos.Table.State state = getTableState(this.watcher, tableName);
86          if (state != null) this.cache.put(tableName, state);
87        }
88      }
89    }
90  
91    /**
92     * Sets table state in ZK. Sets no watches.
93     *
94     * {@inheritDoc}
95     */
96    @Override
97    public void setTableState(TableName tableName, ZooKeeperProtos.Table.State state)
98    throws CoordinatedStateException {
99      synchronized (this.cache) {
100       LOG.info("Moving table " + tableName + " state from " + this.cache.get(tableName)
101         + " to " + state);
102       try {
103         setTableStateInZK(tableName, state);
104       } catch (KeeperException e) {
105         throw new CoordinatedStateException(e);
106       }
107     }
108   }
109 
110   /**
111    * Checks and sets table state in ZK. Sets no watches.
112    * {@inheritDoc}
113    */
114   @Override
115   public boolean setTableStateIfInStates(TableName tableName,
116                                          ZooKeeperProtos.Table.State newState,
117                                          ZooKeeperProtos.Table.State... states)
118       throws CoordinatedStateException {
119     synchronized (this.cache) {
120       // Transition ENABLED->DISABLING has to be performed with a hack, because
121       // we treat empty state as enabled in this case because 0.92- clusters.
122       if (
123           (newState == ZooKeeperProtos.Table.State.DISABLING) &&
124                this.cache.get(tableName) != null && !isTableState(tableName, states) ||
125           (newState != ZooKeeperProtos.Table.State.DISABLING &&
126                !isTableState(tableName, states) )) {
127         return false;
128       }
129       try {
130         setTableStateInZK(tableName, newState);
131       } catch (KeeperException e) {
132         throw new CoordinatedStateException(e);
133       }
134       return true;
135     }
136   }
137 
138   /**
139    * Checks and sets table state in ZK. Sets no watches.
140    * {@inheritDoc}
141    */
142   @Override
143   public boolean setTableStateIfNotInStates(TableName tableName,
144                                             ZooKeeperProtos.Table.State newState,
145                                             ZooKeeperProtos.Table.State... states)
146     throws CoordinatedStateException {
147     synchronized (this.cache) {
148       if (isTableState(tableName, states)) {
149         // If the table is in the one of the states from the states list, the cache
150         // might be out-of-date, try to find it out from the master source (zookeeper server).
151         //
152         // Note: this adds extra zookeeper server calls and might have performance impact.
153         // However, this is not the happy path so we should not reach here often. Therefore,
154         // the performance impact should be minimal to none.
155         try {
156           ZooKeeperProtos.Table.State curstate = getTableState(watcher, tableName);
157 
158           if (isTableInState(Arrays.asList(states), curstate)) {
159             return false;
160           }
161         } catch (KeeperException e) {
162           throw new CoordinatedStateException(e);
163         } catch (InterruptedException e) {
164           throw new CoordinatedStateException(e);
165         }
166       }
167       try {
168         setTableStateInZK(tableName, newState);
169       } catch (KeeperException e) {
170         throw new CoordinatedStateException(e);
171       }
172       return true;
173     }
174   }
175 
176   private void setTableStateInZK(final TableName tableName,
177                                  final ZooKeeperProtos.Table.State state)
178       throws KeeperException {
179     String znode = ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString());
180     if (ZKUtil.checkExists(this.watcher, znode) == -1) {
181       ZKUtil.createAndFailSilent(this.watcher, znode);
182     }
183     synchronized (this.cache) {
184       ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
185       builder.setState(state);
186       byte [] data = ProtobufUtil.prependPBMagic(builder.build().toByteArray());
187       ZKUtil.setData(this.watcher, znode, data);
188       this.cache.put(tableName, state);
189     }
190   }
191 
192   /**
193    * Checks if table is marked in specified state in ZK (using cache only). {@inheritDoc}
194    */
195   @Override
196   public boolean isTableState(final TableName tableName,
197       final ZooKeeperProtos.Table.State... states) {
198     return isTableState(tableName, false, states); // only check cache
199   }
200 
201   /**
202    * Checks if table is marked in specified state in ZK. {@inheritDoc}
203    */
204   @Override
205   public boolean isTableState(final TableName tableName, final boolean checkSource,
206       final ZooKeeperProtos.Table.State... states) {
207     boolean isTableInSpecifiedState;
208     synchronized (this.cache) {
209       ZooKeeperProtos.Table.State currentState = this.cache.get(tableName);
210       if (checkSource) {
211         // The cache might be out-of-date, try to find it out from the master source (zookeeper
212         // server) and update the cache.
213         try {
214           ZooKeeperProtos.Table.State stateInZK = getTableState(watcher, tableName);
215 
216           if (currentState != stateInZK) {
217             if (stateInZK != null) {
218               this.cache.put(tableName, stateInZK);
219             } else {
220               this.cache.remove(tableName);
221             }
222             currentState = stateInZK;
223           }
224         } catch (KeeperException | InterruptedException e) {
225           // Contacting zookeeper failed.  Let us just trust the value in cache.
226         }
227       }
228       return isTableInState(Arrays.asList(states), currentState);
229     }
230   }
231 
232   /**
233    * Deletes the table in zookeeper. Fails silently if the table is not currently disabled in
234    * zookeeper. Sets no watches. {@inheritDoc}
235    */
236   @Override
237   public void setDeletedTable(final TableName tableName)
238   throws CoordinatedStateException {
239     synchronized (this.cache) {
240       if (this.cache.remove(tableName) == null) {
241         LOG.warn("Moving table " + tableName + " state to deleted but was already deleted");
242       }
243       try {
244         ZKUtil.deleteNodeFailSilent(this.watcher,
245           ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()));
246       } catch (KeeperException e) {
247         throw new CoordinatedStateException(e);
248       }
249     }
250   }
251 
252   /**
253    * check if table is present.
254    *
255    * @param tableName table we're working on
256    * @return true if the table is present
257    */
258   @Override
259   public boolean isTablePresent(final TableName tableName) {
260     synchronized (this.cache) {
261       ZooKeeperProtos.Table.State state = this.cache.get(tableName);
262       return !(state == null);
263     }
264   }
265 
266   /**
267    * Gets a list of all the tables set as disabling in zookeeper.
268    * @return Set of disabling tables, empty Set if none
269    * @throws CoordinatedStateException if error happened in underlying coordination engine
270    */
271   @Override
272   public Set<TableName> getTablesInStates(ZooKeeperProtos.Table.State... states)
273     throws InterruptedIOException, CoordinatedStateException {
274     try {
275       return getAllTables(states);
276     } catch (KeeperException e) {
277       throw new CoordinatedStateException(e);
278     }
279   }
280 
281   /**
282    * {@inheritDoc}
283    */
284   @Override
285   public void checkAndRemoveTableState(TableName tableName, ZooKeeperProtos.Table.State states,
286                                        boolean deletePermanentState)
287       throws CoordinatedStateException {
288     synchronized (this.cache) {
289       if (isTableState(tableName, states)) {
290         this.cache.remove(tableName);
291         if (deletePermanentState) {
292           try {
293             ZKUtil.deleteNodeFailSilent(this.watcher,
294                 ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()));
295           } catch (KeeperException e) {
296             throw new CoordinatedStateException(e);
297           }
298         }
299       }
300     }
301   }
302 
303   /**
304    * Gets a list of all the tables of specified states in zookeeper.
305    * @return Set of tables of specified states, empty Set if none
306    * @throws KeeperException
307    */
308   Set<TableName> getAllTables(final ZooKeeperProtos.Table.State... states)
309       throws KeeperException, InterruptedIOException {
310 
311     Set<TableName> allTables = new HashSet<TableName>();
312     List<String> children =
313       ZKUtil.listChildrenNoWatch(watcher, watcher.tableZNode);
314     if(children == null) return allTables;
315     for (String child: children) {
316       TableName tableName = TableName.valueOf(child);
317       ZooKeeperProtos.Table.State state;
318       try {
319         state = getTableState(watcher, tableName);
320       } catch (InterruptedException e) {
321         throw new InterruptedIOException();
322       }
323       for (ZooKeeperProtos.Table.State expectedState: states) {
324         if (state == expectedState) {
325           allTables.add(tableName);
326           break;
327         }
328       }
329     }
330     return allTables;
331   }
332 
333   /**
334    * Gets table state from ZK.
335    * @param zkw ZooKeeperWatcher instance to use
336    * @param tableName table we're checking
337    * @return Null or {@link ZooKeeperProtos.Table.State} found in znode.
338    * @throws KeeperException
339    */
340   private ZooKeeperProtos.Table.State getTableState(final ZooKeeperWatcher zkw,
341                                                    final TableName tableName)
342     throws KeeperException, InterruptedException {
343     String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName.getNameAsString());
344     byte [] data = ZKUtil.getData(zkw, znode);
345     if (data == null || data.length <= 0) return null;
346     try {
347       ProtobufUtil.expectPBMagicPrefix(data);
348       ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
349       int magicLen = ProtobufUtil.lengthOfPBMagic();
350       ProtobufUtil.mergeFrom(builder, data, magicLen, data.length - magicLen);
351       return builder.getState();
352     } catch (IOException e) {
353       KeeperException ke = new KeeperException.DataInconsistencyException();
354       ke.initCause(e);
355       throw ke;
356     } catch (DeserializationException e) {
357       throw ZKUtil.convert(e);
358     }
359   }
360 
361   /**
362    * @return true if current state isn't null and is contained
363    * in the list of expected states.
364    */
365   private boolean isTableInState(final List<ZooKeeperProtos.Table.State> expectedStates,
366                        final ZooKeeperProtos.Table.State currentState) {
367     return currentState != null && expectedStates.contains(currentState);
368   }
369 }