View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import java.io.IOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.util.concurrent.Executors;
25  
26  import org.apache.hadoop.hbase.Server;
27  
28  import com.google.common.util.concurrent.ThreadFactoryBuilder;
29  
30  /**
31   * Base class used bulk assigning and unassigning regions.
32   * Encapsulates a fixed size thread pool of executors to run assignment/unassignment.
33   * Implement {@link #populatePool(java.util.concurrent.ExecutorService)} and
34   * {@link #waitUntilDone(long)}.  The default implementation of
35   * the {@link #getUncaughtExceptionHandler()} is to abort the hosting
36   * Server.
37   */
38  public abstract class BulkAssigner {
39    protected final Server server;
40  
41    /**
42     * @param server An instance of Server
43     */
44    public BulkAssigner(final Server server) {
45      this.server = server;
46    }
47  
48    /**
49     * @return What to use for a thread prefix when executor runs.
50     */
51    protected String getThreadNamePrefix() {
52      return this.server.getServerName() + "-" + this.getClass().getName(); 
53    }
54  
55    protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
56      return new UncaughtExceptionHandler() {
57        @Override
58        public void uncaughtException(Thread t, Throwable e) {
59          // Abort if exception of any kind.
60          server.abort("Uncaught exception in " + t.getName(), e);
61        }
62      };
63    }
64  
65    protected int getThreadCount() {
66      return this.server.getConfiguration().
67        getInt("hbase.bulk.assignment.threadpool.size", 20);
68    }
69  
70    protected long getTimeoutOnRIT() {
71      return this.server.getConfiguration().
72        getLong("hbase.bulk.assignment.waiton.empty.rit", 5 * 60 * 1000);
73    }
74  
75    protected abstract void populatePool(
76        final java.util.concurrent.ExecutorService pool) throws IOException;
77  
78    public boolean bulkAssign() throws InterruptedException, IOException {
79      return bulkAssign(true);
80    }
81  
82    /**
83     * Run the bulk assign.
84     * 
85     * @param sync
86     *          Whether to assign synchronously.
87     * @throws InterruptedException
88     * @return True if done.
89     * @throws IOException
90     */
91    public boolean bulkAssign(boolean sync) throws InterruptedException,
92        IOException {
93      boolean result = false;
94      ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
95      builder.setDaemon(true);
96      builder.setNameFormat(getThreadNamePrefix() + "-%1$d");
97      builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler());
98      int threadCount = getThreadCount();
99      java.util.concurrent.ExecutorService pool =
100       Executors.newFixedThreadPool(threadCount, builder.build());
101     try {
102       populatePool(pool);
103       // How long to wait on empty regions-in-transition.  If we timeout, the
104       // RIT monitor should do fixup.
105       if (sync) result = waitUntilDone(getTimeoutOnRIT());
106     } finally {
107       // We're done with the pool.  It'll exit when its done all in queue.
108       pool.shutdown();
109     }
110     return result;
111   }
112 
113   /**
114    * Wait until bulk assign is done.
115    * @param timeout How long to wait.
116    * @throws InterruptedException
117    * @return True if the condition we were waiting on happened.
118    */
119   protected abstract boolean waitUntilDone(final long timeout)
120   throws InterruptedException;
121 }