View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.InterProcessLock;
30  import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
31  import org.apache.hadoop.hbase.InterProcessReadWriteLock;
32  import org.apache.hadoop.hbase.ServerName;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41  import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock;
42  import org.apache.zookeeper.KeeperException;
43  
44  import com.google.protobuf.InvalidProtocolBufferException;
45  
46  /**
47   * A manager for distributed table level locks.
48   */
49  @InterfaceAudience.Private
50  public abstract class TableLockManager {
51  
52    private static final Log LOG = LogFactory.getLog(TableLockManager.class);
53  
54    /** Configuration key for enabling table-level locks for schema changes */
55    public static final String TABLE_LOCK_ENABLE =
56      "hbase.table.lock.enable";
57  
58    /** by default we should enable table-level locks for schema changes */
59    private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true;
60  
61    /** Configuration key for time out for trying to acquire table locks */
62    protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
63      "hbase.table.write.lock.timeout.ms";
64  
65    /** Configuration key for time out for trying to acquire table locks */
66    protected static final String TABLE_READ_LOCK_TIMEOUT_MS =
67      "hbase.table.read.lock.timeout.ms";
68  
69    protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS =
70      600 * 1000; //10 min default
71  
72    protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS =
73      600 * 1000; //10 min default
74  
75    public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms";
76  
77    public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS =
78        600 * 1000; //10 min default
79  
80    /**
81     * A distributed lock for a table.
82     */
83    @InterfaceAudience.Private
84    public interface TableLock {
85      /**
86       * Acquire the lock, with the configured lock timeout.
87       * @throws LockTimeoutException If unable to acquire a lock within a specified
88       * time period (if any)
89       * @throws IOException If unrecoverable error occurs
90       */
91      void acquire() throws IOException;
92  
93      /**
94       * Release the lock already held.
95       * @throws IOException If there is an unrecoverable error releasing the lock
96       */
97      void release() throws IOException;
98    }
99  
100   /**
101    * Returns a TableLock for locking the table for exclusive access
102    * @param tableName Table to lock
103    * @param purpose Human readable reason for locking the table
104    * @return A new TableLock object for acquiring a write lock
105    */
106   public abstract TableLock writeLock(TableName tableName, String purpose);
107 
108   /**
109    * Returns a TableLock for locking the table for shared access among read-lock holders
110    * @param tableName Table to lock
111    * @param purpose Human readable reason for locking the table
112    * @return A new TableLock object for acquiring a read lock
113    */
114   public abstract TableLock readLock(TableName tableName, String purpose);
115 
116   /**
117    * Visits all table locks(read and write), and lock attempts with the given callback
118    * MetadataHandler.
119    * @param handler the metadata handler to call
120    * @throws IOException If there is an unrecoverable error
121    */
122   public abstract void visitAllLocks(MetadataHandler handler) throws IOException;
123 
124   /**
125    * Force releases all table locks(read and write) that have been held longer than
126    * "hbase.table.lock.expire.ms". Assumption is that the clock skew between zookeeper
127    * and this servers is negligible.
128    * The behavior of the lock holders still thinking that they have the lock is undefined.
129    * @throws IOException If there is an unrecoverable error
130    */
131   public abstract void reapAllExpiredLocks() throws IOException;
132 
133   /**
134    * Force releases table write locks and lock attempts even if this thread does
135    * not own the lock. The behavior of the lock holders still thinking that they
136    * have the lock is undefined. This should be used carefully and only when
137    * we can ensure that all write-lock holders have died. For example if only
138    * the master can hold write locks, then we can reap it's locks when the backup
139    * master starts.
140    * @throws IOException If there is an unrecoverable error
141    */
142   public abstract void reapWriteLocks() throws IOException;
143 
144   /**
145    * Called after a table has been deleted, and after the table lock is  released.
146    * TableLockManager should do cleanup for the table state.
147    * @param tableName name of the table
148    * @throws IOException If there is an unrecoverable error releasing the lock
149    */
150   public abstract void tableDeleted(TableName tableName)
151       throws IOException;
152 
153   /**
154    * Creates and returns a TableLockManager according to the configuration
155    */
156   public static TableLockManager createTableLockManager(Configuration conf,
157       ZooKeeperWatcher zkWatcher, ServerName serverName) {
158     // Initialize table level lock manager for schema changes, if enabled.
159     if (conf.getBoolean(TABLE_LOCK_ENABLE,
160         DEFAULT_TABLE_LOCK_ENABLE)) {
161       long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS,
162           DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
163       long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS,
164           DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
165       long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT,
166           DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS);
167 
168       return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs);
169     }
170 
171     return new NullTableLockManager();
172   }
173 
174   /**
175    * A null implementation
176    */
177   @InterfaceAudience.Private
178   public static class NullTableLockManager extends TableLockManager {
179     static class NullTableLock implements TableLock {
180       @Override
181       public void acquire() throws IOException {
182       }
183       @Override
184       public void release() throws IOException {
185       }
186     }
187     @Override
188     public TableLock writeLock(TableName tableName, String purpose) {
189       return new NullTableLock();
190     }
191     @Override
192     public TableLock readLock(TableName tableName, String purpose) {
193       return new NullTableLock();
194     }
195     @Override
196     public void reapAllExpiredLocks() throws IOException {
197     }
198     @Override
199     public void reapWriteLocks() throws IOException {
200     }
201     @Override
202     public void tableDeleted(TableName tableName) throws IOException {
203     }
204     @Override
205     public void visitAllLocks(MetadataHandler handler) throws IOException {
206     }
207   }
208 
209   /** Public for hbck */
210   public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) {
211     int pblen = ProtobufUtil.lengthOfPBMagic();
212     if (bytes == null || bytes.length < pblen) {
213       return null;
214     }
215     try {
216       ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom(
217           bytes, pblen, bytes.length - pblen).build();
218       return data;
219     } catch (InvalidProtocolBufferException ex) {
220       LOG.warn("Exception in deserialization", ex);
221     }
222     return null;
223   }
224 
225   /**
226    * ZooKeeper based TableLockManager
227    */
228   @InterfaceAudience.Private
229   private static class ZKTableLockManager extends TableLockManager {
230 
231     private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() {
232       @Override
233       public void handleMetadata(byte[] ownerMetadata) {
234         if (!LOG.isDebugEnabled()) {
235           return;
236         }
237         ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata);
238         if (data == null) {
239           return;
240         }
241         LOG.debug("Table is locked by " +
242             String.format("[tableName=%s:%s, lockOwner=%s, threadId=%s, " +
243                 "purpose=%s, isShared=%s, createTime=%s]",
244                 data.getTableName().getNamespace().toStringUtf8(),
245                 data.getTableName().getQualifier().toStringUtf8(),
246                 ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(),
247                 data.getPurpose(), data.getIsShared(), data.getCreateTime()));
248       }
249     };
250 
251     private static class TableLockImpl implements TableLock {
252       long lockTimeoutMs;
253       TableName tableName;
254       InterProcessLock lock;
255       boolean isShared;
256       ZooKeeperWatcher zkWatcher;
257       ServerName serverName;
258       String purpose;
259 
260       public TableLockImpl(TableName tableName, ZooKeeperWatcher zkWatcher,
261           ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) {
262         this.tableName = tableName;
263         this.zkWatcher = zkWatcher;
264         this.serverName = serverName;
265         this.lockTimeoutMs = lockTimeoutMs;
266         this.isShared = isShared;
267         this.purpose = purpose;
268       }
269 
270       @Override
271       public void acquire() throws IOException {
272         if (LOG.isTraceEnabled()) {
273           LOG.trace("Attempt to acquire table " + (isShared ? "read" : "write") +
274             " lock on: " + tableName + " for:" + purpose);
275         }
276 
277         lock = createTableLock();
278         try {
279           if (lockTimeoutMs == -1) {
280             // Wait indefinitely
281             lock.acquire();
282           } else {
283             if (!lock.tryAcquire(lockTimeoutMs)) {
284               throw new LockTimeoutException("Timed out acquiring " +
285                 (isShared ? "read" : "write") + "lock for table:" + tableName +
286                 "for:" + purpose + " after " + lockTimeoutMs + " ms.");
287             }
288           }
289         } catch (InterruptedException e) {
290           LOG.warn("Interrupted acquiring a lock for " + tableName, e);
291           Thread.currentThread().interrupt();
292           throw new InterruptedIOException("Interrupted acquiring a lock");
293         }
294         if (LOG.isTraceEnabled()) LOG.trace("Acquired table " + (isShared ? "read" : "write")
295             + " lock on " + tableName + " for " + purpose);
296       }
297 
298       @Override
299       public void release() throws IOException {
300         if (LOG.isTraceEnabled()) {
301           LOG.trace("Attempt to release table " + (isShared ? "read" : "write")
302               + " lock on " + tableName);
303         }
304         if (lock == null) {
305           throw new IllegalStateException("Table " + tableName +
306             " is not locked!");
307         }
308 
309         try {
310           lock.release();
311         } catch (InterruptedException e) {
312           LOG.warn("Interrupted while releasing a lock for " + tableName);
313           throw new InterruptedIOException();
314         }
315         if (LOG.isTraceEnabled()) {
316           LOG.trace("Released table lock on " + tableName);
317         }
318       }
319 
320       private InterProcessLock createTableLock() {
321         String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode,
322             tableName.getNameAsString());
323 
324         ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder()
325           .setTableName(ProtobufUtil.toProtoTableName(tableName))
326           .setLockOwner(ProtobufUtil.toServerName(serverName))
327           .setThreadId(Thread.currentThread().getId())
328           .setPurpose(purpose)
329           .setIsShared(isShared)
330           .setCreateTime(EnvironmentEdgeManager.currentTime()).build();
331         byte[] lockMetadata = toBytes(data);
332 
333         InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode,
334           METADATA_HANDLER);
335         return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata);
336       }
337     }
338 
339     private static byte[] toBytes(ZooKeeperProtos.TableLock data) {
340       return ProtobufUtil.prependPBMagic(data.toByteArray());
341     }
342 
343     private final ServerName serverName;
344     private final ZooKeeperWatcher zkWatcher;
345     private final long writeLockTimeoutMs;
346     private final long readLockTimeoutMs;
347     private final long lockExpireTimeoutMs;
348 
349     /**
350      * Initialize a new manager for table-level locks.
351      * @param zkWatcher
352      * @param serverName Address of the server responsible for acquiring and
353      * releasing the table-level locks
354      * @param writeLockTimeoutMs Timeout (in milliseconds) for acquiring a write lock for a
355      * given table, or -1 for no timeout
356      * @param readLockTimeoutMs Timeout (in milliseconds) for acquiring a read lock for a
357      * given table, or -1 for no timeout
358      */
359     public ZKTableLockManager(ZooKeeperWatcher zkWatcher,
360       ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) {
361       this.zkWatcher = zkWatcher;
362       this.serverName = serverName;
363       this.writeLockTimeoutMs = writeLockTimeoutMs;
364       this.readLockTimeoutMs = readLockTimeoutMs;
365       this.lockExpireTimeoutMs = lockExpireTimeoutMs;
366     }
367 
368     @Override
369     public TableLock writeLock(TableName tableName, String purpose) {
370       return new TableLockImpl(tableName, zkWatcher,
371           serverName, writeLockTimeoutMs, false, purpose);
372     }
373 
374     public TableLock readLock(TableName tableName, String purpose) {
375       return new TableLockImpl(tableName, zkWatcher,
376           serverName, readLockTimeoutMs, true, purpose);
377     }
378 
379     public void visitAllLocks(MetadataHandler handler) throws IOException {
380       for (String tableName : getTableNames()) {
381         String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
382         ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
383             zkWatcher, tableLockZNode, null);
384         lock.readLock(null).visitLocks(handler);
385         lock.writeLock(null).visitLocks(handler);
386       }
387     }
388 
389     private List<String> getTableNames() throws IOException {
390 
391       List<String> tableNames;
392       try {
393         tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode);
394       } catch (KeeperException e) {
395         LOG.error("Unexpected ZooKeeper error when listing children", e);
396         throw new IOException("Unexpected ZooKeeper exception", e);
397       }
398       return tableNames;
399     }
400 
401     @Override
402     public void reapWriteLocks() throws IOException {
403       //get the table names
404       try {
405         for (String tableName : getTableNames()) {
406           String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
407           ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
408               zkWatcher, tableLockZNode, null);
409           lock.writeLock(null).reapAllLocks();
410         }
411       } catch (IOException ex) {
412         throw ex;
413       } catch (Exception ex) {
414         LOG.warn("Caught exception while reaping table write locks", ex);
415       }
416     }
417 
418     @Override
419     public void reapAllExpiredLocks() throws IOException {
420       //get the table names
421       try {
422         for (String tableName : getTableNames()) {
423           String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
424           ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
425               zkWatcher, tableLockZNode, null);
426           lock.readLock(null).reapExpiredLocks(lockExpireTimeoutMs);
427           lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs);
428         }
429       } catch (IOException ex) {
430         throw ex;
431       } catch (Exception ex) {
432         throw new IOException(ex);
433       }
434     }
435 
436     @Override
437     public void tableDeleted(TableName tableName) throws IOException {
438       //table write lock from DeleteHandler is already released, just delete the parent znode
439       String tableNameStr = tableName.getNameAsString();
440       String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr);
441       try {
442         ZKUtil.deleteNode(zkWatcher, tableLockZNode);
443       } catch (KeeperException ex) {
444         if (ex.code() == KeeperException.Code.NOTEMPTY) {
445           //we might get this in rare occasions where a CREATE table or some other table operation
446           //is waiting to acquire the lock. In this case, parent znode won't be deleted.
447           LOG.warn("Could not delete the znode for table locks because NOTEMPTY: "
448               + tableLockZNode);
449           return;
450         }
451         throw new IOException(ex);
452       }
453     }
454   }
455 }