1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.lang.Thread.UncaughtExceptionHandler;
23 import java.util.concurrent.Executors;
24
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.Server;
27
28 import com.google.common.util.concurrent.ThreadFactoryBuilder;
29
30
31
32
33
34
35
36
37
38 @InterfaceAudience.Private
39 public abstract class BulkAssigner {
40 protected final Server server;
41
42
43
44
45 public BulkAssigner(final Server server) {
46 this.server = server;
47 }
48
49
50
51
52 protected String getThreadNamePrefix() {
53 return this.server.getServerName() + "-" + this.getClass().getName();
54 }
55
56 protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
57 return new UncaughtExceptionHandler() {
58 @Override
59 public void uncaughtException(Thread t, Throwable e) {
60
61 server.abort("Uncaught exception in " + t.getName(), e);
62 }
63 };
64 }
65
66 protected int getThreadCount() {
67 return this.server.getConfiguration().
68 getInt("hbase.bulk.assignment.threadpool.size", 20);
69 }
70
71 protected long getTimeoutOnRIT() {
72 return this.server.getConfiguration().
73 getLong("hbase.bulk.assignment.waiton.empty.rit", 5 * 60 * 1000);
74 }
75
76 protected abstract void populatePool(
77 final java.util.concurrent.ExecutorService pool) throws IOException;
78
79 public boolean bulkAssign() throws InterruptedException, IOException {
80 return bulkAssign(true);
81 }
82
83
84
85
86
87
88
89
90
91
92 public boolean bulkAssign(boolean sync) throws InterruptedException,
93 IOException {
94 boolean result = false;
95 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
96 builder.setDaemon(true);
97 builder.setNameFormat(getThreadNamePrefix() + "-%1$d");
98 builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler());
99 int threadCount = getThreadCount();
100 java.util.concurrent.ExecutorService pool =
101 Executors.newFixedThreadPool(threadCount, builder.build());
102 try {
103 populatePool(pool);
104
105
106 if (sync) result = waitUntilDone(getTimeoutOnRIT());
107 } finally {
108
109 pool.shutdown();
110 }
111 return result;
112 }
113
114
115
116
117
118
119
120 protected abstract boolean waitUntilDone(final long timeout)
121 throws InterruptedException;
122 }