View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.InterProcessLock;
32  import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
33  import org.apache.hadoop.hbase.InterProcessReadWriteLock;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
41  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42  import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock;
43  import org.apache.zookeeper.KeeperException;
44  
45  /**
46   * A manager for distributed table level locks.
47   */
48  @InterfaceAudience.Private
49  public abstract class TableLockManager {
50  
51    private static final Log LOG = LogFactory.getLog(TableLockManager.class);
52  
53    /** Configuration key for enabling table-level locks for schema changes */
54    public static final String TABLE_LOCK_ENABLE =
55      "hbase.table.lock.enable";
56  
57    /** by default we should enable table-level locks for schema changes */
58    private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true;
59  
60    /** Configuration key for time out for trying to acquire table locks */
61    protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
62      "hbase.table.write.lock.timeout.ms";
63  
64    /** Configuration key for time out for trying to acquire table locks */
65    protected static final String TABLE_READ_LOCK_TIMEOUT_MS =
66      "hbase.table.read.lock.timeout.ms";
67  
68    protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS =
69      600 * 1000; //10 min default
70  
71    protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS =
72      600 * 1000; //10 min default
73  
74    public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms";
75  
76    public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS =
77        600 * 1000; //10 min default
78  
79    /**
80     * A distributed lock for a table.
81     */
82    @InterfaceAudience.Private
83    public interface TableLock {
84      /**
85       * Acquire the lock, with the configured lock timeout.
86       * @throws LockTimeoutException If unable to acquire a lock within a specified
87       * time period (if any)
88       * @throws IOException If unrecoverable error occurs
89       */
90      void acquire() throws IOException;
91  
92      /**
93       * Release the lock already held.
94       * @throws IOException If there is an unrecoverable error releasing the lock
95       */
96      void release() throws IOException;
97    }
98  
99    /**
100    * Returns a TableLock for locking the table for exclusive access
101    * @param tableName Table to lock
102    * @param purpose Human readable reason for locking the table
103    * @return A new TableLock object for acquiring a write lock
104    */
105   public abstract TableLock writeLock(TableName tableName, String purpose);
106 
107   /**
108    * Returns a TableLock for locking the table for shared access among read-lock holders
109    * @param tableName Table to lock
110    * @param purpose Human readable reason for locking the table
111    * @return A new TableLock object for acquiring a read lock
112    */
113   public abstract TableLock readLock(TableName tableName, String purpose);
114 
115   /**
116    * Visits all table locks(read and write), and lock attempts with the given callback
117    * MetadataHandler.
118    * @param handler the metadata handler to call
119    * @throws IOException If there is an unrecoverable error
120    */
121   public abstract void visitAllLocks(MetadataHandler handler) throws IOException;
122 
123   /**
124    * Force releases all table locks(read and write) that have been held longer than
125    * "hbase.table.lock.expire.ms". Assumption is that the clock skew between zookeeper
126    * and this servers is negligible.
127    * The behavior of the lock holders still thinking that they have the lock is undefined.
128    * @throws IOException If there is an unrecoverable error
129    */
130   public abstract void reapAllExpiredLocks() throws IOException;
131 
132   /**
133    * Force releases table write locks and lock attempts even if this thread does
134    * not own the lock. The behavior of the lock holders still thinking that they
135    * have the lock is undefined. This should be used carefully and only when
136    * we can ensure that all write-lock holders have died. For example if only
137    * the master can hold write locks, then we can reap it's locks when the backup
138    * master starts.
139    * @throws IOException If there is an unrecoverable error
140    */
141   public abstract void reapWriteLocks() throws IOException;
142 
143   /**
144    * Called after a table has been deleted, and after the table lock is  released.
145    * TableLockManager should do cleanup for the table state.
146    * @param tableName name of the table
147    * @throws IOException If there is an unrecoverable error releasing the lock
148    */
149   public abstract void tableDeleted(TableName tableName)
150       throws IOException;
151 
152   /**
153    * Creates and returns a TableLockManager according to the configuration
154    */
155   public static TableLockManager createTableLockManager(Configuration conf,
156       ZooKeeperWatcher zkWatcher, ServerName serverName) {
157     // Initialize table level lock manager for schema changes, if enabled.
158     if (conf.getBoolean(TABLE_LOCK_ENABLE,
159         DEFAULT_TABLE_LOCK_ENABLE)) {
160       long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS,
161           DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
162       long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS,
163           DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
164       long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT,
165           DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS);
166 
167       return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs);
168     }
169 
170     return new NullTableLockManager();
171   }
172 
173   /**
174    * A null implementation
175    */
176   @InterfaceAudience.Private
177   public static class NullTableLockManager extends TableLockManager {
178     static class NullTableLock implements TableLock {
179       @Override
180       public void acquire() throws IOException {
181       }
182       @Override
183       public void release() throws IOException {
184       }
185     }
186     @Override
187     public TableLock writeLock(TableName tableName, String purpose) {
188       return new NullTableLock();
189     }
190     @Override
191     public TableLock readLock(TableName tableName, String purpose) {
192       return new NullTableLock();
193     }
194     @Override
195     public void reapAllExpiredLocks() throws IOException {
196     }
197     @Override
198     public void reapWriteLocks() throws IOException {
199     }
200     @Override
201     public void tableDeleted(TableName tableName) throws IOException {
202     }
203     @Override
204     public void visitAllLocks(MetadataHandler handler) throws IOException {
205     }
206   }
207 
208   /** Public for hbck */
209   public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) {
210     int pblen = ProtobufUtil.lengthOfPBMagic();
211     if (bytes == null || bytes.length < pblen) {
212       return null;
213     }
214     try {
215       ZooKeeperProtos.TableLock.Builder builder = ZooKeeperProtos.TableLock.newBuilder();
216       ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
217       return builder.build();
218     } catch (IOException ex) {
219       LOG.warn("Exception in deserialization", ex);
220     }
221     return null;
222   }
223 
224   /**
225    * ZooKeeper based TableLockManager
226    */
227   @InterfaceAudience.Private
228   private static class ZKTableLockManager extends TableLockManager {
229 
230     private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() {
231       @Override
232       public void handleMetadata(byte[] ownerMetadata) {
233         if (!LOG.isDebugEnabled()) {
234           return;
235         }
236         ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata);
237         if (data == null) {
238           return;
239         }
240         LOG.debug("Table is locked by " +
241             String.format("[tableName=%s:%s, lockOwner=%s, threadId=%s, " +
242                 "purpose=%s, isShared=%s, createTime=%s]",
243                 data.getTableName().getNamespace().toStringUtf8(),
244                 data.getTableName().getQualifier().toStringUtf8(),
245                 ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(),
246                 data.getPurpose(), data.getIsShared(), data.getCreateTime()));
247       }
248     };
249 
250     private static class TableLockImpl implements TableLock {
251       long lockTimeoutMs;
252       TableName tableName;
253       InterProcessLock lock;
254       boolean isShared;
255       ZooKeeperWatcher zkWatcher;
256       ServerName serverName;
257       String purpose;
258 
259       public TableLockImpl(TableName tableName, ZooKeeperWatcher zkWatcher,
260           ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) {
261         this.tableName = tableName;
262         this.zkWatcher = zkWatcher;
263         this.serverName = serverName;
264         this.lockTimeoutMs = lockTimeoutMs;
265         this.isShared = isShared;
266         this.purpose = purpose;
267       }
268 
269       @Override
270       public void acquire() throws IOException {
271         if (LOG.isTraceEnabled()) {
272           LOG.trace("Attempt to acquire table " + (isShared ? "read" : "write") +
273             " lock on: " + tableName + " for:" + purpose);
274         }
275 
276         lock = createTableLock();
277         try {
278           if (lockTimeoutMs == -1) {
279             // Wait indefinitely
280             lock.acquire();
281           } else {
282             if (!lock.tryAcquire(lockTimeoutMs)) {
283               throw new LockTimeoutException("Timed out acquiring " +
284                 (isShared ? "read" : "write") + "lock for table:" + tableName +
285                 "for:" + purpose + " after " + lockTimeoutMs + " ms.");
286             }
287           }
288         } catch (InterruptedException e) {
289           LOG.warn("Interrupted acquiring a lock for " + tableName, e);
290           Thread.currentThread().interrupt();
291           throw new InterruptedIOException("Interrupted acquiring a lock");
292         }
293         if (LOG.isTraceEnabled()) LOG.trace("Acquired table " + (isShared ? "read" : "write")
294             + " lock on " + tableName + " for " + purpose);
295       }
296 
297       @Override
298       public void release() throws IOException {
299         if (LOG.isTraceEnabled()) {
300           LOG.trace("Attempt to release table " + (isShared ? "read" : "write")
301               + " lock on " + tableName);
302         }
303         if (lock == null) {
304           throw new IllegalStateException("Table " + tableName +
305             " is not locked!");
306         }
307 
308         try {
309           lock.release();
310         } catch (InterruptedException e) {
311           LOG.warn("Interrupted while releasing a lock for " + tableName);
312           throw new InterruptedIOException();
313         }
314         if (LOG.isTraceEnabled()) {
315           LOG.trace("Released table lock on " + tableName);
316         }
317       }
318 
319       private InterProcessLock createTableLock() {
320         String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode,
321             tableName.getNameAsString());
322 
323         ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder()
324           .setTableName(ProtobufUtil.toProtoTableName(tableName))
325           .setLockOwner(ProtobufUtil.toServerName(serverName))
326           .setThreadId(Thread.currentThread().getId())
327           .setPurpose(purpose)
328           .setIsShared(isShared)
329           .setCreateTime(EnvironmentEdgeManager.currentTime()).build();
330         byte[] lockMetadata = toBytes(data);
331 
332         InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode,
333           METADATA_HANDLER);
334         return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata);
335       }
336     }
337 
338     private static byte[] toBytes(ZooKeeperProtos.TableLock data) {
339       return ProtobufUtil.prependPBMagic(data.toByteArray());
340     }
341 
342     private final ServerName serverName;
343     private final ZooKeeperWatcher zkWatcher;
344     private final long writeLockTimeoutMs;
345     private final long readLockTimeoutMs;
346     private final long lockExpireTimeoutMs;
347 
348     /**
349      * Initialize a new manager for table-level locks.
350      * @param zkWatcher
351      * @param serverName Address of the server responsible for acquiring and
352      * releasing the table-level locks
353      * @param writeLockTimeoutMs Timeout (in milliseconds) for acquiring a write lock for a
354      * given table, or -1 for no timeout
355      * @param readLockTimeoutMs Timeout (in milliseconds) for acquiring a read lock for a
356      * given table, or -1 for no timeout
357      */
358     public ZKTableLockManager(ZooKeeperWatcher zkWatcher,
359       ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) {
360       this.zkWatcher = zkWatcher;
361       this.serverName = serverName;
362       this.writeLockTimeoutMs = writeLockTimeoutMs;
363       this.readLockTimeoutMs = readLockTimeoutMs;
364       this.lockExpireTimeoutMs = lockExpireTimeoutMs;
365     }
366 
367     @Override
368     public TableLock writeLock(TableName tableName, String purpose) {
369       return new TableLockImpl(tableName, zkWatcher,
370           serverName, writeLockTimeoutMs, false, purpose);
371     }
372 
373     public TableLock readLock(TableName tableName, String purpose) {
374       return new TableLockImpl(tableName, zkWatcher,
375           serverName, readLockTimeoutMs, true, purpose);
376     }
377 
378     public void visitAllLocks(MetadataHandler handler) throws IOException {
379       for (String tableName : getTableNames()) {
380         String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
381         ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
382             zkWatcher, tableLockZNode, null);
383         lock.readLock(null).visitLocks(handler);
384         lock.writeLock(null).visitLocks(handler);
385       }
386     }
387 
388     private List<String> getTableNames() throws IOException {
389 
390       List<String> tableNames;
391       try {
392         tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode);
393       } catch (KeeperException e) {
394         LOG.error("Unexpected ZooKeeper error when listing children", e);
395         throw new IOException("Unexpected ZooKeeper exception", e);
396       }
397       return tableNames;
398     }
399 
400     @Override
401     public void reapWriteLocks() throws IOException {
402       //get the table names
403       try {
404         for (String tableName : getTableNames()) {
405           String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
406           ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
407               zkWatcher, tableLockZNode, null);
408           lock.writeLock(null).reapAllLocks();
409         }
410       } catch (IOException ex) {
411         throw ex;
412       } catch (Exception ex) {
413         LOG.warn("Caught exception while reaping table write locks", ex);
414       }
415     }
416 
417     @Override
418     public void reapAllExpiredLocks() throws IOException {
419       //get the table names
420       try {
421         for (String tableName : getTableNames()) {
422           String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
423           ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
424               zkWatcher, tableLockZNode, null);
425           lock.readLock(null).reapExpiredLocks(lockExpireTimeoutMs);
426           lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs);
427         }
428       } catch (IOException ex) {
429         throw ex;
430       } catch (Exception ex) {
431         throw new IOException(ex);
432       }
433     }
434 
435     @Override
436     public void tableDeleted(TableName tableName) throws IOException {
437       //table write lock from DeleteHandler is already released, just delete the parent znode
438       String tableNameStr = tableName.getNameAsString();
439       String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr);
440       try {
441         ZKUtil.deleteNode(zkWatcher, tableLockZNode);
442       } catch (KeeperException ex) {
443         if (ex.code() == KeeperException.Code.NOTEMPTY) {
444           //we might get this in rare occasions where a CREATE table or some other table operation
445           //is waiting to acquire the lock. In this case, parent znode won't be deleted.
446           LOG.warn("Could not delete the znode for table locks because NOTEMPTY: "
447               + tableLockZNode);
448           return;
449         }
450         throw new IOException(ex);
451       }
452     }
453   }
454 }