View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.InterProcessLock;
32  import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
33  import org.apache.hadoop.hbase.InterProcessReadWriteLock;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
41  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42  import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock;
43  import org.apache.zookeeper.KeeperException;
44  
45  import com.google.protobuf.InvalidProtocolBufferException;
46  
47  /**
48   * A manager for distributed table level locks.
49   */
50  @InterfaceAudience.Private
51  public abstract class TableLockManager {
52  
53    private static final Log LOG = LogFactory.getLog(TableLockManager.class);
54  
55    /** Configuration key for enabling table-level locks for schema changes */
56    public static final String TABLE_LOCK_ENABLE =
57      "hbase.table.lock.enable";
58  
59    /** by default we should enable table-level locks for schema changes */
60    private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true;
61  
62    /** Configuration key for time out for trying to acquire table locks */
63    protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
64      "hbase.table.write.lock.timeout.ms";
65  
66    /** Configuration key for time out for trying to acquire table locks */
67    protected static final String TABLE_READ_LOCK_TIMEOUT_MS =
68      "hbase.table.read.lock.timeout.ms";
69  
70    protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS =
71      600 * 1000; //10 min default
72  
73    protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS =
74      600 * 1000; //10 min default
75  
76    public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms";
77  
78    public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS =
79        600 * 1000; //10 min default
80  
81    /**
82     * A distributed lock for a table.
83     */
84    @InterfaceAudience.Private
85    public interface TableLock {
86      /**
87       * Acquire the lock, with the configured lock timeout.
88       * @throws LockTimeoutException If unable to acquire a lock within a specified
89       * time period (if any)
90       * @throws IOException If unrecoverable error occurs
91       */
92      void acquire() throws IOException;
93  
94      /**
95       * Release the lock already held.
96       * @throws IOException If there is an unrecoverable error releasing the lock
97       */
98      void release() throws IOException;
99    }
100 
101   /**
102    * Returns a TableLock for locking the table for exclusive access
103    * @param tableName Table to lock
104    * @param purpose Human readable reason for locking the table
105    * @return A new TableLock object for acquiring a write lock
106    */
107   public abstract TableLock writeLock(TableName tableName, String purpose);
108 
109   /**
110    * Returns a TableLock for locking the table for shared access among read-lock holders
111    * @param tableName Table to lock
112    * @param purpose Human readable reason for locking the table
113    * @return A new TableLock object for acquiring a read lock
114    */
115   public abstract TableLock readLock(TableName tableName, String purpose);
116 
117   /**
118    * Visits all table locks(read and write), and lock attempts with the given callback
119    * MetadataHandler.
120    * @param handler the metadata handler to call
121    * @throws IOException If there is an unrecoverable error
122    */
123   public abstract void visitAllLocks(MetadataHandler handler) throws IOException;
124 
125   /**
126    * Force releases all table locks(read and write) that have been held longer than
127    * "hbase.table.lock.expire.ms". Assumption is that the clock skew between zookeeper
128    * and this servers is negligible.
129    * The behavior of the lock holders still thinking that they have the lock is undefined.
130    * @throws IOException If there is an unrecoverable error
131    */
132   public abstract void reapAllExpiredLocks() throws IOException;
133 
134   /**
135    * Force releases table write locks and lock attempts even if this thread does
136    * not own the lock. The behavior of the lock holders still thinking that they
137    * have the lock is undefined. This should be used carefully and only when
138    * we can ensure that all write-lock holders have died. For example if only
139    * the master can hold write locks, then we can reap it's locks when the backup
140    * master starts.
141    * @throws IOException If there is an unrecoverable error
142    */
143   public abstract void reapWriteLocks() throws IOException;
144 
145   /**
146    * Called after a table has been deleted, and after the table lock is  released.
147    * TableLockManager should do cleanup for the table state.
148    * @param tableName name of the table
149    * @throws IOException If there is an unrecoverable error releasing the lock
150    */
151   public abstract void tableDeleted(TableName tableName)
152       throws IOException;
153 
154   /**
155    * Creates and returns a TableLockManager according to the configuration
156    */
157   public static TableLockManager createTableLockManager(Configuration conf,
158       ZooKeeperWatcher zkWatcher, ServerName serverName) {
159     // Initialize table level lock manager for schema changes, if enabled.
160     if (conf.getBoolean(TABLE_LOCK_ENABLE,
161         DEFAULT_TABLE_LOCK_ENABLE)) {
162       long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS,
163           DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
164       long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS,
165           DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
166       long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT,
167           DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS);
168 
169       return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs);
170     }
171 
172     return new NullTableLockManager();
173   }
174 
175   /**
176    * A null implementation
177    */
178   @InterfaceAudience.Private
179   public static class NullTableLockManager extends TableLockManager {
180     static class NullTableLock implements TableLock {
181       @Override
182       public void acquire() throws IOException {
183       }
184       @Override
185       public void release() throws IOException {
186       }
187     }
188     @Override
189     public TableLock writeLock(TableName tableName, String purpose) {
190       return new NullTableLock();
191     }
192     @Override
193     public TableLock readLock(TableName tableName, String purpose) {
194       return new NullTableLock();
195     }
196     @Override
197     public void reapAllExpiredLocks() throws IOException {
198     }
199     @Override
200     public void reapWriteLocks() throws IOException {
201     }
202     @Override
203     public void tableDeleted(TableName tableName) throws IOException {
204     }
205     @Override
206     public void visitAllLocks(MetadataHandler handler) throws IOException {
207     }
208   }
209 
210   /** Public for hbck */
211   public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) {
212     int pblen = ProtobufUtil.lengthOfPBMagic();
213     if (bytes == null || bytes.length < pblen) {
214       return null;
215     }
216     try {
217       ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom(
218           bytes, pblen, bytes.length - pblen).build();
219       return data;
220     } catch (InvalidProtocolBufferException ex) {
221       LOG.warn("Exception in deserialization", ex);
222     }
223     return null;
224   }
225 
226   /**
227    * ZooKeeper based TableLockManager
228    */
229   @InterfaceAudience.Private
230   private static class ZKTableLockManager extends TableLockManager {
231 
232     private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() {
233       @Override
234       public void handleMetadata(byte[] ownerMetadata) {
235         if (!LOG.isDebugEnabled()) {
236           return;
237         }
238         ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata);
239         if (data == null) {
240           return;
241         }
242         LOG.debug("Table is locked by " +
243             String.format("[tableName=%s:%s, lockOwner=%s, threadId=%s, " +
244                 "purpose=%s, isShared=%s, createTime=%s]",
245                 data.getTableName().getNamespace().toStringUtf8(),
246                 data.getTableName().getQualifier().toStringUtf8(),
247                 ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(),
248                 data.getPurpose(), data.getIsShared(), data.getCreateTime()));
249       }
250     };
251 
252     private static class TableLockImpl implements TableLock {
253       long lockTimeoutMs;
254       TableName tableName;
255       InterProcessLock lock;
256       boolean isShared;
257       ZooKeeperWatcher zkWatcher;
258       ServerName serverName;
259       String purpose;
260 
261       public TableLockImpl(TableName tableName, ZooKeeperWatcher zkWatcher,
262           ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) {
263         this.tableName = tableName;
264         this.zkWatcher = zkWatcher;
265         this.serverName = serverName;
266         this.lockTimeoutMs = lockTimeoutMs;
267         this.isShared = isShared;
268         this.purpose = purpose;
269       }
270 
271       @Override
272       public void acquire() throws IOException {
273         if (LOG.isTraceEnabled()) {
274           LOG.trace("Attempt to acquire table " + (isShared ? "read" : "write") +
275             " lock on: " + tableName + " for:" + purpose);
276         }
277 
278         lock = createTableLock();
279         try {
280           if (lockTimeoutMs == -1) {
281             // Wait indefinitely
282             lock.acquire();
283           } else {
284             if (!lock.tryAcquire(lockTimeoutMs)) {
285               throw new LockTimeoutException("Timed out acquiring " +
286                 (isShared ? "read" : "write") + "lock for table:" + tableName +
287                 "for:" + purpose + " after " + lockTimeoutMs + " ms.");
288             }
289           }
290         } catch (InterruptedException e) {
291           LOG.warn("Interrupted acquiring a lock for " + tableName, e);
292           Thread.currentThread().interrupt();
293           throw new InterruptedIOException("Interrupted acquiring a lock");
294         }
295         if (LOG.isTraceEnabled()) LOG.trace("Acquired table " + (isShared ? "read" : "write")
296             + " lock on " + tableName + " for " + purpose);
297       }
298 
299       @Override
300       public void release() throws IOException {
301         if (LOG.isTraceEnabled()) {
302           LOG.trace("Attempt to release table " + (isShared ? "read" : "write")
303               + " lock on " + tableName);
304         }
305         if (lock == null) {
306           throw new IllegalStateException("Table " + tableName +
307             " is not locked!");
308         }
309 
310         try {
311           lock.release();
312         } catch (InterruptedException e) {
313           LOG.warn("Interrupted while releasing a lock for " + tableName);
314           throw new InterruptedIOException();
315         }
316         if (LOG.isTraceEnabled()) {
317           LOG.trace("Released table lock on " + tableName);
318         }
319       }
320 
321       private InterProcessLock createTableLock() {
322         String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode,
323             tableName.getNameAsString());
324 
325         ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder()
326           .setTableName(ProtobufUtil.toProtoTableName(tableName))
327           .setLockOwner(ProtobufUtil.toServerName(serverName))
328           .setThreadId(Thread.currentThread().getId())
329           .setPurpose(purpose)
330           .setIsShared(isShared)
331           .setCreateTime(EnvironmentEdgeManager.currentTime()).build();
332         byte[] lockMetadata = toBytes(data);
333 
334         InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode,
335           METADATA_HANDLER);
336         return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata);
337       }
338     }
339 
340     private static byte[] toBytes(ZooKeeperProtos.TableLock data) {
341       return ProtobufUtil.prependPBMagic(data.toByteArray());
342     }
343 
344     private final ServerName serverName;
345     private final ZooKeeperWatcher zkWatcher;
346     private final long writeLockTimeoutMs;
347     private final long readLockTimeoutMs;
348     private final long lockExpireTimeoutMs;
349 
350     /**
351      * Initialize a new manager for table-level locks.
352      * @param zkWatcher
353      * @param serverName Address of the server responsible for acquiring and
354      * releasing the table-level locks
355      * @param writeLockTimeoutMs Timeout (in milliseconds) for acquiring a write lock for a
356      * given table, or -1 for no timeout
357      * @param readLockTimeoutMs Timeout (in milliseconds) for acquiring a read lock for a
358      * given table, or -1 for no timeout
359      */
360     public ZKTableLockManager(ZooKeeperWatcher zkWatcher,
361       ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) {
362       this.zkWatcher = zkWatcher;
363       this.serverName = serverName;
364       this.writeLockTimeoutMs = writeLockTimeoutMs;
365       this.readLockTimeoutMs = readLockTimeoutMs;
366       this.lockExpireTimeoutMs = lockExpireTimeoutMs;
367     }
368 
369     @Override
370     public TableLock writeLock(TableName tableName, String purpose) {
371       return new TableLockImpl(tableName, zkWatcher,
372           serverName, writeLockTimeoutMs, false, purpose);
373     }
374 
375     public TableLock readLock(TableName tableName, String purpose) {
376       return new TableLockImpl(tableName, zkWatcher,
377           serverName, readLockTimeoutMs, true, purpose);
378     }
379 
380     public void visitAllLocks(MetadataHandler handler) throws IOException {
381       for (String tableName : getTableNames()) {
382         String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
383         ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
384             zkWatcher, tableLockZNode, null);
385         lock.readLock(null).visitLocks(handler);
386         lock.writeLock(null).visitLocks(handler);
387       }
388     }
389 
390     private List<String> getTableNames() throws IOException {
391 
392       List<String> tableNames;
393       try {
394         tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode);
395       } catch (KeeperException e) {
396         LOG.error("Unexpected ZooKeeper error when listing children", e);
397         throw new IOException("Unexpected ZooKeeper exception", e);
398       }
399       return tableNames;
400     }
401 
402     @Override
403     public void reapWriteLocks() throws IOException {
404       //get the table names
405       try {
406         for (String tableName : getTableNames()) {
407           String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
408           ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
409               zkWatcher, tableLockZNode, null);
410           lock.writeLock(null).reapAllLocks();
411         }
412       } catch (IOException ex) {
413         throw ex;
414       } catch (Exception ex) {
415         LOG.warn("Caught exception while reaping table write locks", ex);
416       }
417     }
418 
419     @Override
420     public void reapAllExpiredLocks() throws IOException {
421       //get the table names
422       try {
423         for (String tableName : getTableNames()) {
424           String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
425           ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
426               zkWatcher, tableLockZNode, null);
427           lock.readLock(null).reapExpiredLocks(lockExpireTimeoutMs);
428           lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs);
429         }
430       } catch (IOException ex) {
431         throw ex;
432       } catch (Exception ex) {
433         throw new IOException(ex);
434       }
435     }
436 
437     @Override
438     public void tableDeleted(TableName tableName) throws IOException {
439       //table write lock from DeleteHandler is already released, just delete the parent znode
440       String tableNameStr = tableName.getNameAsString();
441       String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr);
442       try {
443         ZKUtil.deleteNode(zkWatcher, tableLockZNode);
444       } catch (KeeperException ex) {
445         if (ex.code() == KeeperException.Code.NOTEMPTY) {
446           //we might get this in rare occasions where a CREATE table or some other table operation
447           //is waiting to acquire the lock. In this case, parent znode won't be deleted.
448           LOG.warn("Could not delete the znode for table locks because NOTEMPTY: "
449               + tableLockZNode);
450           return;
451         }
452         throw new IOException(ex);
453       }
454     }
455   }
456 }