View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.InterProcessLock;
30  import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
31  import org.apache.hadoop.hbase.InterProcessReadWriteLock;
32  import org.apache.hadoop.hbase.ServerName;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41  import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock;
42  import org.apache.zookeeper.KeeperException;
43  
44  /**
45   * A manager for distributed table level locks.
46   */
47  @InterfaceAudience.Private
48  public abstract class TableLockManager {
49  
50    private static final Log LOG = LogFactory.getLog(TableLockManager.class);
51  
52    /** Configuration key for enabling table-level locks for schema changes */
53    public static final String TABLE_LOCK_ENABLE =
54      "hbase.table.lock.enable";
55  
56    /** by default we should enable table-level locks for schema changes */
57    private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true;
58  
59    /** Configuration key for time out for trying to acquire table locks */
60    protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
61      "hbase.table.write.lock.timeout.ms";
62  
63    /** Configuration key for time out for trying to acquire table locks */
64    protected static final String TABLE_READ_LOCK_TIMEOUT_MS =
65      "hbase.table.read.lock.timeout.ms";
66  
67    protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS =
68      600 * 1000; //10 min default
69  
70    protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS =
71      600 * 1000; //10 min default
72  
73    public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms";
74  
75    public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS =
76        600 * 1000; //10 min default
77  
78    /**
79     * A distributed lock for a table.
80     */
81    @InterfaceAudience.Private
82    public interface TableLock {
83      /**
84       * Acquire the lock, with the configured lock timeout.
85       * @throws LockTimeoutException If unable to acquire a lock within a specified
86       * time period (if any)
87       * @throws IOException If unrecoverable error occurs
88       */
89      void acquire() throws IOException;
90  
91      /**
92       * Release the lock already held.
93       * @throws IOException If there is an unrecoverable error releasing the lock
94       */
95      void release() throws IOException;
96    }
97  
98    /**
99     * Returns a TableLock for locking the table for exclusive access
100    * @param tableName Table to lock
101    * @param purpose Human readable reason for locking the table
102    * @return A new TableLock object for acquiring a write lock
103    */
104   public abstract TableLock writeLock(TableName tableName, String purpose);
105 
106   /**
107    * Returns a TableLock for locking the table for shared access among read-lock holders
108    * @param tableName Table to lock
109    * @param purpose Human readable reason for locking the table
110    * @return A new TableLock object for acquiring a read lock
111    */
112   public abstract TableLock readLock(TableName tableName, String purpose);
113 
114   /**
115    * Visits all table locks(read and write), and lock attempts with the given callback
116    * MetadataHandler.
117    * @param handler the metadata handler to call
118    * @throws IOException If there is an unrecoverable error
119    */
120   public abstract void visitAllLocks(MetadataHandler handler) throws IOException;
121 
122   /**
123    * Force releases all table locks(read and write) that have been held longer than
124    * "hbase.table.lock.expire.ms". Assumption is that the clock skew between zookeeper
125    * and this servers is negligible.
126    * The behavior of the lock holders still thinking that they have the lock is undefined.
127    * @throws IOException If there is an unrecoverable error
128    */
129   public abstract void reapAllExpiredLocks() throws IOException;
130 
131   /**
132    * Force releases table write locks and lock attempts even if this thread does
133    * not own the lock. The behavior of the lock holders still thinking that they
134    * have the lock is undefined. This should be used carefully and only when
135    * we can ensure that all write-lock holders have died. For example if only
136    * the master can hold write locks, then we can reap it's locks when the backup
137    * master starts.
138    * @throws IOException If there is an unrecoverable error
139    */
140   public abstract void reapWriteLocks() throws IOException;
141 
142   /**
143    * Called after a table has been deleted, and after the table lock is  released.
144    * TableLockManager should do cleanup for the table state.
145    * @param tableName name of the table
146    * @throws IOException If there is an unrecoverable error releasing the lock
147    */
148   public abstract void tableDeleted(TableName tableName)
149       throws IOException;
150 
151   /**
152    * Creates and returns a TableLockManager according to the configuration
153    */
154   public static TableLockManager createTableLockManager(Configuration conf,
155       ZooKeeperWatcher zkWatcher, ServerName serverName) {
156     // Initialize table level lock manager for schema changes, if enabled.
157     if (conf.getBoolean(TABLE_LOCK_ENABLE,
158         DEFAULT_TABLE_LOCK_ENABLE)) {
159       long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS,
160           DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
161       long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS,
162           DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
163       long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT,
164           DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS);
165 
166       return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs);
167     }
168 
169     return new NullTableLockManager();
170   }
171 
172   /**
173    * A null implementation
174    */
175   @InterfaceAudience.Private
176   public static class NullTableLockManager extends TableLockManager {
177     static class NullTableLock implements TableLock {
178       @Override
179       public void acquire() throws IOException {
180       }
181       @Override
182       public void release() throws IOException {
183       }
184     }
185     @Override
186     public TableLock writeLock(TableName tableName, String purpose) {
187       return new NullTableLock();
188     }
189     @Override
190     public TableLock readLock(TableName tableName, String purpose) {
191       return new NullTableLock();
192     }
193     @Override
194     public void reapAllExpiredLocks() throws IOException {
195     }
196     @Override
197     public void reapWriteLocks() throws IOException {
198     }
199     @Override
200     public void tableDeleted(TableName tableName) throws IOException {
201     }
202     @Override
203     public void visitAllLocks(MetadataHandler handler) throws IOException {
204     }
205   }
206 
207   /** Public for hbck */
208   public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) {
209     int pblen = ProtobufUtil.lengthOfPBMagic();
210     if (bytes == null || bytes.length < pblen) {
211       return null;
212     }
213     try {
214       ZooKeeperProtos.TableLock.Builder builder = ZooKeeperProtos.TableLock.newBuilder();
215       ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
216       return builder.build();
217     } catch (IOException ex) {
218       LOG.warn("Exception in deserialization", ex);
219     }
220     return null;
221   }
222 
223   /**
224    * ZooKeeper based TableLockManager
225    */
226   @InterfaceAudience.Private
227   private static class ZKTableLockManager extends TableLockManager {
228 
229     private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() {
230       @Override
231       public void handleMetadata(byte[] ownerMetadata) {
232         if (!LOG.isDebugEnabled()) {
233           return;
234         }
235         ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata);
236         if (data == null) {
237           return;
238         }
239         LOG.debug("Table is locked by " +
240             String.format("[tableName=%s:%s, lockOwner=%s, threadId=%s, " +
241                 "purpose=%s, isShared=%s, createTime=%s]",
242                 data.getTableName().getNamespace().toStringUtf8(),
243                 data.getTableName().getQualifier().toStringUtf8(),
244                 ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(),
245                 data.getPurpose(), data.getIsShared(), data.getCreateTime()));
246       }
247     };
248 
249     private static class TableLockImpl implements TableLock {
250       long lockTimeoutMs;
251       TableName tableName;
252       InterProcessLock lock;
253       boolean isShared;
254       ZooKeeperWatcher zkWatcher;
255       ServerName serverName;
256       String purpose;
257 
258       public TableLockImpl(TableName tableName, ZooKeeperWatcher zkWatcher,
259           ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) {
260         this.tableName = tableName;
261         this.zkWatcher = zkWatcher;
262         this.serverName = serverName;
263         this.lockTimeoutMs = lockTimeoutMs;
264         this.isShared = isShared;
265         this.purpose = purpose;
266       }
267 
268       @Override
269       public void acquire() throws IOException {
270         if (LOG.isTraceEnabled()) {
271           LOG.trace("Attempt to acquire table " + (isShared ? "read" : "write") +
272             " lock on: " + tableName + " for:" + purpose);
273         }
274 
275         lock = createTableLock();
276         try {
277           if (lockTimeoutMs == -1) {
278             // Wait indefinitely
279             lock.acquire();
280           } else {
281             if (!lock.tryAcquire(lockTimeoutMs)) {
282               throw new LockTimeoutException("Timed out acquiring " +
283                 (isShared ? "read" : "write") + "lock for table:" + tableName +
284                 "for:" + purpose + " after " + lockTimeoutMs + " ms.");
285             }
286           }
287         } catch (InterruptedException e) {
288           LOG.warn("Interrupted acquiring a lock for " + tableName, e);
289           Thread.currentThread().interrupt();
290           throw new InterruptedIOException("Interrupted acquiring a lock");
291         }
292         if (LOG.isTraceEnabled()) LOG.trace("Acquired table " + (isShared ? "read" : "write")
293             + " lock on " + tableName + " for " + purpose);
294       }
295 
296       @Override
297       public void release() throws IOException {
298         if (LOG.isTraceEnabled()) {
299           LOG.trace("Attempt to release table " + (isShared ? "read" : "write")
300               + " lock on " + tableName);
301         }
302         if (lock == null) {
303           throw new IllegalStateException("Table " + tableName +
304             " is not locked!");
305         }
306 
307         try {
308           lock.release();
309         } catch (InterruptedException e) {
310           LOG.warn("Interrupted while releasing a lock for " + tableName);
311           throw new InterruptedIOException();
312         }
313         if (LOG.isTraceEnabled()) {
314           LOG.trace("Released table lock on " + tableName);
315         }
316       }
317 
318       private InterProcessLock createTableLock() {
319         String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode,
320             tableName.getNameAsString());
321 
322         ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder()
323           .setTableName(ProtobufUtil.toProtoTableName(tableName))
324           .setLockOwner(ProtobufUtil.toServerName(serverName))
325           .setThreadId(Thread.currentThread().getId())
326           .setPurpose(purpose)
327           .setIsShared(isShared)
328           .setCreateTime(EnvironmentEdgeManager.currentTime()).build();
329         byte[] lockMetadata = toBytes(data);
330 
331         InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode,
332           METADATA_HANDLER);
333         return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata);
334       }
335     }
336 
337     private static byte[] toBytes(ZooKeeperProtos.TableLock data) {
338       return ProtobufUtil.prependPBMagic(data.toByteArray());
339     }
340 
341     private final ServerName serverName;
342     private final ZooKeeperWatcher zkWatcher;
343     private final long writeLockTimeoutMs;
344     private final long readLockTimeoutMs;
345     private final long lockExpireTimeoutMs;
346 
347     /**
348      * Initialize a new manager for table-level locks.
349      * @param zkWatcher
350      * @param serverName Address of the server responsible for acquiring and
351      * releasing the table-level locks
352      * @param writeLockTimeoutMs Timeout (in milliseconds) for acquiring a write lock for a
353      * given table, or -1 for no timeout
354      * @param readLockTimeoutMs Timeout (in milliseconds) for acquiring a read lock for a
355      * given table, or -1 for no timeout
356      */
357     public ZKTableLockManager(ZooKeeperWatcher zkWatcher,
358       ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) {
359       this.zkWatcher = zkWatcher;
360       this.serverName = serverName;
361       this.writeLockTimeoutMs = writeLockTimeoutMs;
362       this.readLockTimeoutMs = readLockTimeoutMs;
363       this.lockExpireTimeoutMs = lockExpireTimeoutMs;
364     }
365 
366     @Override
367     public TableLock writeLock(TableName tableName, String purpose) {
368       return new TableLockImpl(tableName, zkWatcher,
369           serverName, writeLockTimeoutMs, false, purpose);
370     }
371 
372     public TableLock readLock(TableName tableName, String purpose) {
373       return new TableLockImpl(tableName, zkWatcher,
374           serverName, readLockTimeoutMs, true, purpose);
375     }
376 
377     public void visitAllLocks(MetadataHandler handler) throws IOException {
378       for (String tableName : getTableNames()) {
379         String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
380         ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
381             zkWatcher, tableLockZNode, null);
382         lock.readLock(null).visitLocks(handler);
383         lock.writeLock(null).visitLocks(handler);
384       }
385     }
386 
387     private List<String> getTableNames() throws IOException {
388 
389       List<String> tableNames;
390       try {
391         tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode);
392       } catch (KeeperException e) {
393         LOG.error("Unexpected ZooKeeper error when listing children", e);
394         throw new IOException("Unexpected ZooKeeper exception", e);
395       }
396       return tableNames;
397     }
398 
399     @Override
400     public void reapWriteLocks() throws IOException {
401       //get the table names
402       try {
403         for (String tableName : getTableNames()) {
404           String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
405           ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
406               zkWatcher, tableLockZNode, null);
407           lock.writeLock(null).reapAllLocks();
408         }
409       } catch (IOException ex) {
410         throw ex;
411       } catch (Exception ex) {
412         LOG.warn("Caught exception while reaping table write locks", ex);
413       }
414     }
415 
416     @Override
417     public void reapAllExpiredLocks() throws IOException {
418       //get the table names
419       try {
420         for (String tableName : getTableNames()) {
421           String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
422           ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
423               zkWatcher, tableLockZNode, null);
424           lock.readLock(null).reapExpiredLocks(lockExpireTimeoutMs);
425           lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs);
426         }
427       } catch (IOException ex) {
428         throw ex;
429       } catch (Exception ex) {
430         throw new IOException(ex);
431       }
432     }
433 
434     @Override
435     public void tableDeleted(TableName tableName) throws IOException {
436       //table write lock from DeleteHandler is already released, just delete the parent znode
437       String tableNameStr = tableName.getNameAsString();
438       String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr);
439       try {
440         ZKUtil.deleteNode(zkWatcher, tableLockZNode);
441       } catch (KeeperException ex) {
442         if (ex.code() == KeeperException.Code.NOTEMPTY) {
443           //we might get this in rare occasions where a CREATE table or some other table operation
444           //is waiting to acquire the lock. In this case, parent znode won't be deleted.
445           LOG.warn("Could not delete the znode for table locks because NOTEMPTY: "
446               + tableLockZNode);
447           return;
448         }
449         throw new IOException(ex);
450       }
451     }
452   }
453 }