View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.master.procedure;
20  
21  import java.io.IOException;
22  import java.util.ArrayDeque;
23  import java.util.Deque;
24  import java.util.concurrent.locks.Condition;
25  import java.util.concurrent.locks.ReentrantLock;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.TableExistsException;
32  import org.apache.hadoop.hbase.TableNotFoundException;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.classification.InterfaceStability;
35  import org.apache.hadoop.hbase.procedure2.Procedure;
36  import org.apache.hadoop.hbase.procedure2.ProcedureFairRunQueues;
37  import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
38  import org.apache.hadoop.hbase.master.TableLockManager;
39  import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
40  import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
41  
42  /**
43   * ProcedureRunnableSet for the Master Procedures.
44   * This RunnableSet tries to provide to the ProcedureExecutor procedures
45   * that can be executed without having to wait on a lock.
46   * Most of the master operations can be executed concurrently, if the they
47   * are operating on different tables (e.g. two create table can be performed
48   * at the same, time assuming table A and table B).
49   *
50   * Each procedure should implement an interface providing information for this queue.
51   * for example table related procedures should implement TableProcedureInterface.
52   * each procedure will be pushed in its own queue, and based on the operation type
53   * we may take smarter decision. e.g. we can abort all the operations preceding
54   * a delete table, or similar.
55   */
56  @InterfaceAudience.Private
57  @InterfaceStability.Evolving
58  public class MasterProcedureQueue implements ProcedureRunnableSet {
59    private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class);
60  
61    private final ProcedureFairRunQueues<TableName, RunQueue> fairq;
62    private final ReentrantLock lock = new ReentrantLock();
63    private final Condition waitCond = lock.newCondition();
64    private final TableLockManager lockManager;
65  
66    private final int metaTablePriority;
67    private final int userTablePriority;
68    private final int sysTablePriority;
69  
70    private int queueSize;
71  
72    public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) {
73      this.fairq = new ProcedureFairRunQueues<TableName, RunQueue>(1);
74      this.lockManager = lockManager;
75  
76      // TODO: should this be part of the HTD?
77      metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
78      sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
79      userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
80    }
81  
82    @Override
83    public void addFront(final Procedure proc) {
84      lock.lock();
85      try {
86        getRunQueueOrCreate(proc).addFront(proc);
87        queueSize++;
88        waitCond.signal();
89      } finally {
90        lock.unlock();
91      }
92    }
93  
94    @Override
95    public void addBack(final Procedure proc) {
96      lock.lock();
97      try {
98        getRunQueueOrCreate(proc).addBack(proc);
99        queueSize++;
100       waitCond.signal();
101     } finally {
102       lock.unlock();
103     }
104   }
105 
106   @Override
107   public void yield(final Procedure proc) {
108     addFront(proc);
109   }
110 
111   @Override
112   @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
113   public Long poll() {
114     lock.lock();
115     try {
116       if (queueSize == 0) {
117         waitCond.await();
118         if (queueSize == 0) {
119           return null;
120         }
121       }
122 
123       RunQueue queue = fairq.poll();
124       if (queue != null && queue.isAvailable()) {
125         queueSize--;
126         return queue.poll();
127       }
128     } catch (InterruptedException e) {
129       Thread.currentThread().interrupt();
130       return null;
131     } finally {
132       lock.unlock();
133     }
134     return null;
135   }
136 
137   @Override
138   public void signalAll() {
139     lock.lock();
140     try {
141       waitCond.signalAll();
142     } finally {
143       lock.unlock();
144     }
145   }
146 
147   @Override
148   public void clear() {
149     lock.lock();
150     try {
151       fairq.clear();
152       queueSize = 0;
153     } finally {
154       lock.unlock();
155     }
156   }
157 
158   @Override
159   public int size() {
160     lock.lock();
161     try {
162       return queueSize;
163     } finally {
164       lock.unlock();
165     }
166   }
167 
168   @Override
169   public String toString() {
170     lock.lock();
171     try {
172       return "MasterProcedureQueue size=" + queueSize + ": " + fairq;
173     } finally {
174       lock.unlock();
175     }
176   }
177 
178   @Override
179   public void completionCleanup(Procedure proc) {
180     if (proc instanceof TableProcedureInterface) {
181       TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
182       boolean tableDeleted;
183       if (proc.hasException()) {
184         IOException procEx =  proc.getException().unwrapRemoteException();
185         if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
186           // create failed because the table already exist
187           tableDeleted = !(procEx instanceof TableExistsException);
188         } else {
189           // the operation failed because the table does not exist
190           tableDeleted = (procEx instanceof TableNotFoundException);
191         }
192       } else {
193         // the table was deleted
194         tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
195       }
196       if (tableDeleted) {
197         markTableAsDeleted(iProcTable.getTableName());
198       }
199     }
200   }
201 
202   private RunQueue getRunQueueOrCreate(final Procedure proc) {
203     if (proc instanceof TableProcedureInterface) {
204       final TableName table = ((TableProcedureInterface)proc).getTableName();
205       return getRunQueueOrCreate(table);
206     }
207     // TODO: at the moment we only have Table procedures
208     // if you are implementing a non-table procedure, you have two option create
209     // a group for all the non-table procedures or try to find a key for your
210     // non-table procedure and implement something similar to the TableRunQueue.
211     throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet");
212   }
213 
214   private TableRunQueue getRunQueueOrCreate(final TableName table) {
215     final TableRunQueue queue = getRunQueue(table);
216     if (queue != null) return queue;
217     return (TableRunQueue)fairq.add(table, createTableRunQueue(table));
218   }
219 
220   private TableRunQueue createTableRunQueue(final TableName table) {
221     int priority = userTablePriority;
222     if (table.equals(TableName.META_TABLE_NAME)) {
223       priority = metaTablePriority;
224     } else if (table.isSystemTable()) {
225       priority = sysTablePriority;
226     }
227     return new TableRunQueue(priority);
228   }
229 
230   private TableRunQueue getRunQueue(final TableName table) {
231     return (TableRunQueue)fairq.get(table);
232   }
233 
234   /**
235    * Try to acquire the read lock on the specified table.
236    * other read operations in the table-queue may be executed concurrently,
237    * otherwise they have to wait until all the read-locks are released.
238    * @param table Table to lock
239    * @param purpose Human readable reason for locking the table
240    * @return true if we were able to acquire the lock on the table, otherwise false.
241    */
242   public boolean tryAcquireTableRead(final TableName table, final String purpose) {
243     return getRunQueueOrCreate(table).tryRead(lockManager, table, purpose);
244   }
245 
246   /**
247    * Release the read lock taken with tryAcquireTableRead()
248    * @param table the name of the table that has the read lock
249    */
250   public void releaseTableRead(final TableName table) {
251     getRunQueue(table).releaseRead(lockManager, table);
252   }
253 
254   /**
255    * Try to acquire the write lock on the specified table.
256    * other operations in the table-queue will be executed after the lock is released.
257    * @param table Table to lock
258    * @param purpose Human readable reason for locking the table
259    * @return true if we were able to acquire the lock on the table, otherwise false.
260    */
261   public boolean tryAcquireTableWrite(final TableName table, final String purpose) {
262     return getRunQueueOrCreate(table).tryWrite(lockManager, table, purpose);
263   }
264 
265   /**
266    * Release the write lock taken with tryAcquireTableWrite()
267    * @param table the name of the table that has the write lock
268    */
269   public void releaseTableWrite(final TableName table) {
270     getRunQueue(table).releaseWrite(lockManager, table);
271   }
272 
273   /**
274    * Tries to remove the queue and the table-lock of the specified table.
275    * If there are new operations pending (e.g. a new create),
276    * the remove will not be performed.
277    * @param table the name of the table that should be marked as deleted
278    * @return true if deletion succeeded, false otherwise meaning that there are
279    *    other new operations pending for that table (e.g. a new create).
280    */
281   protected boolean markTableAsDeleted(final TableName table) {
282     TableRunQueue queue = getRunQueue(table);
283     if (queue != null) {
284       lock.lock();
285       try {
286         if (queue.isEmpty() && queue.acquireDeleteLock()) {
287           fairq.remove(table);
288 
289           // Remove the table lock
290           try {
291             lockManager.tableDeleted(table);
292           } catch (IOException e) {
293             LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical
294           }
295         } else {
296           // TODO: If there are no create, we can drop all the other ops
297           return false;
298         }
299       } finally {
300         lock.unlock();
301       }
302     }
303     return true;
304   }
305 
306   private interface RunQueue extends ProcedureFairRunQueues.FairObject {
307     void addFront(Procedure proc);
308     void addBack(Procedure proc);
309     Long poll();
310     boolean acquireDeleteLock();
311   }
312 
313   /**
314    * Run Queue for a Table. It contains a read-write lock that is used by the
315    * MasterProcedureQueue to decide if we should fetch an item from this queue
316    * or skip to another one which will be able to run without waiting for locks.
317    */
318   private static class TableRunQueue implements RunQueue {
319     private final Deque<Long> runnables = new ArrayDeque<Long>();
320     private final int priority;
321 
322     private TableLock tableLock = null;
323     private boolean wlock = false;
324     private int rlock = 0;
325 
326     public TableRunQueue(int priority) {
327       this.priority = priority;
328     }
329 
330     @Override
331     public void addFront(final Procedure proc) {
332       runnables.addFirst(proc.getProcId());
333     }
334 
335     // TODO: Improve run-queue push with TableProcedureInterface.getType()
336     //       we can take smart decisions based on the type of the operation (e.g. create/delete)
337     @Override
338     public void addBack(final Procedure proc) {
339       runnables.addLast(proc.getProcId());
340     }
341 
342     @Override
343     public Long poll() {
344       return runnables.poll();
345     }
346 
347     @Override
348     public boolean isAvailable() {
349       synchronized (this) {
350         return !wlock && !runnables.isEmpty();
351       }
352     }
353 
354     public boolean isEmpty() {
355       return runnables.isEmpty();
356     }
357 
358     @Override
359     public synchronized boolean acquireDeleteLock() {
360       if (isLocked()) {
361         return false;
362       }
363       wlock = true;
364       return true;
365     }
366 
367     public boolean isLocked() {
368       synchronized (this) {
369         return wlock || rlock > 0;
370       }
371     }
372 
373     public boolean tryRead(final TableLockManager lockManager,
374         final TableName tableName, final String purpose) {
375       synchronized (this) {
376         if (wlock) {
377           return false;
378         }
379 
380         // Take zk-read-lock
381         tableLock = lockManager.readLock(tableName, purpose);
382         try {
383           tableLock.acquire();
384         } catch (IOException e) {
385           LOG.error("failed acquire read lock on " + tableName, e);
386           tableLock = null;
387           return false;
388         }
389 
390         rlock++;
391       }
392       return true;
393     }
394 
395     public void releaseRead(final TableLockManager lockManager,
396         final TableName tableName) {
397       synchronized (this) {
398         releaseTableLock(lockManager, rlock == 1);
399         rlock--;
400       }
401     }
402 
403     public boolean tryWrite(final TableLockManager lockManager,
404         final TableName tableName, final String purpose) {
405       synchronized (this) {
406         if (wlock || rlock > 0) {
407           return false;
408         }
409 
410         // Take zk-write-lock
411         tableLock = lockManager.writeLock(tableName, purpose);
412         try {
413           tableLock.acquire();
414         } catch (IOException e) {
415           LOG.error("failed acquire write lock on " + tableName, e);
416           tableLock = null;
417           return false;
418         }
419         wlock = true;
420       }
421       return true;
422     }
423 
424     public void releaseWrite(final TableLockManager lockManager,
425         final TableName tableName) {
426       synchronized (this) {
427         releaseTableLock(lockManager, true);
428         wlock = false;
429       }
430     }
431 
432     private void releaseTableLock(final TableLockManager lockManager, boolean reset) {
433       for (int i = 0; i < 3; ++i) {
434         try {
435           tableLock.release();
436           if (reset) {
437             tableLock = null;
438           }
439           break;
440         } catch (IOException e) {
441           LOG.warn("Could not release the table write-lock", e);
442         }
443       }
444     }
445 
446     @Override
447     public int getPriority() {
448       return priority;
449     }
450 
451     @Override
452     public String toString() {
453       return runnables.toString();
454     }
455   }
456 }