1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure.flush;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorCompletionService;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Abortable;
37 import org.apache.hadoop.hbase.DaemonThreadFactory;
38 import org.apache.hadoop.hbase.DroppedSnapshotException;
39 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.errorhandling.ForeignException;
42 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
43 import org.apache.hadoop.hbase.procedure.ProcedureMember;
44 import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
45 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
46 import org.apache.hadoop.hbase.procedure.Subprocedure;
47 import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
48 import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
49 import org.apache.hadoop.hbase.regionserver.HRegionServer;
50 import org.apache.hadoop.hbase.regionserver.Region;
51 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
52 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
53 import org.apache.zookeeper.KeeperException;
54
55
56
57
58 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
59 public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager {
60 private static final Log LOG = LogFactory.getLog(RegionServerFlushTableProcedureManager.class);
61
62 private static final String CONCURENT_FLUSH_TASKS_KEY =
63 "hbase.flush.procedure.region.concurrentTasks";
64 private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3;
65
66 public static final String FLUSH_REQUEST_THREADS_KEY =
67 "hbase.flush.procedure.region.pool.threads";
68 public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10;
69
70 public static final String FLUSH_TIMEOUT_MILLIS_KEY =
71 "hbase.flush.procedure.region.timeout";
72 public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
73
74 public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY =
75 "hbase.flush.procedure.region.wakefrequency";
76 private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500;
77
78 private RegionServerServices rss;
79 private ProcedureMemberRpcs memberRpcs;
80 private ProcedureMember member;
81
82
83
84
85
86
87
88
89 RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server,
90 ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
91 this.rss = server;
92 this.memberRpcs = memberRpc;
93 this.member = procMember;
94 }
95
96 public RegionServerFlushTableProcedureManager() {}
97
98
99
100
101 @Override
102 public void start() {
103 LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString());
104 this.memberRpcs.start(rss.getServerName().toString(), member);
105 }
106
107
108
109
110
111
112 @Override
113 public void stop(boolean force) throws IOException {
114 String mode = force ? "abruptly" : "gracefully";
115 LOG.info("Stopping region server flush procedure manager " + mode + ".");
116
117 try {
118 this.member.close();
119 } finally {
120 this.memberRpcs.close();
121 }
122 }
123
124
125
126
127
128
129
130
131
132
133 public Subprocedure buildSubprocedure(String table) {
134
135
136 if (rss.isStopping() || rss.isStopped()) {
137 throw new IllegalStateException("Can't start flush region subprocedure on RS: "
138 + rss.getServerName() + ", because stopping/stopped!");
139 }
140
141
142 List<Region> involvedRegions;
143 try {
144 involvedRegions = getRegionsToFlush(table);
145 } catch (IOException e1) {
146 throw new IllegalStateException("Failed to figure out if there is region to flush.", e1);
147 }
148
149
150
151
152
153 LOG.debug("Launching subprocedure to flush regions for " + table);
154 ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table);
155 Configuration conf = rss.getConfiguration();
156 long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY,
157 FLUSH_TIMEOUT_MILLIS_DEFAULT);
158 long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY,
159 FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
160
161 FlushTableSubprocedurePool taskManager =
162 new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
163 return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
164 timeoutMillis, involvedRegions, table, taskManager);
165 }
166
167
168
169
170
171
172
173
174
175
176
177 private List<Region> getRegionsToFlush(String table) throws IOException {
178 return rss.getOnlineRegions(TableName.valueOf(table));
179 }
180
181 public class FlushTableSubprocedureBuilder implements SubprocedureFactory {
182
183 @Override
184 public Subprocedure buildSubprocedure(String name, byte[] data) {
185
186 return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name);
187 }
188
189 }
190
191
192
193
194
195
196
197
198
199 static class FlushTableSubprocedurePool {
200 private final Abortable abortable;
201 private final ExecutorCompletionService<Void> taskPool;
202 private final ThreadPoolExecutor executor;
203 private volatile boolean stopped;
204 private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
205 private final String name;
206
207 FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
208 this.abortable = abortable;
209
210 long keepAlive = conf.getLong(
211 RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
212 RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
213 int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
214 this.name = name;
215 executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS,
216 new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs("
217 + name + ")-flush-proc-pool"));
218 taskPool = new ExecutorCompletionService<Void>(executor);
219 }
220
221 boolean hasTasks() {
222 return futures.size() != 0;
223 }
224
225
226
227
228
229
230 void submitTask(final Callable<Void> task) {
231 Future<Void> f = this.taskPool.submit(task);
232 futures.add(f);
233 }
234
235
236
237
238
239
240
241
242 boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
243 LOG.debug("Waiting for local region flush to finish.");
244
245 int sz = futures.size();
246 try {
247
248 for (int i = 0; i < sz; i++) {
249 Future<Void> f = taskPool.take();
250 f.get();
251 if (!futures.remove(f)) {
252 LOG.warn("unexpected future" + f);
253 }
254 LOG.debug("Completed " + (i+1) + "/" + sz + " local region flush tasks.");
255 }
256 LOG.debug("Completed " + sz + " local region flush tasks.");
257 return true;
258 } catch (InterruptedException e) {
259 LOG.warn("Got InterruptedException in FlushSubprocedurePool", e);
260 if (!stopped) {
261 Thread.currentThread().interrupt();
262 throw new ForeignException("FlushSubprocedurePool", e);
263 }
264
265 } catch (ExecutionException e) {
266 Throwable cause = e.getCause();
267 if (cause instanceof ForeignException) {
268 LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
269 throw (ForeignException)e.getCause();
270 } else if (cause instanceof DroppedSnapshotException) {
271
272 abortable.abort("Received DroppedSnapshotException, aborting", cause);
273 }
274 LOG.warn("Got Exception in FlushSubprocedurePool", e);
275 throw new ForeignException(name, e.getCause());
276 } finally {
277 cancelTasks();
278 }
279 return false;
280 }
281
282
283
284
285
286
287 void cancelTasks() throws InterruptedException {
288 Collection<Future<Void>> tasks = futures;
289 LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name);
290 for (Future<Void> f: tasks) {
291 f.cancel(false);
292 }
293
294
295 futures.clear();
296 while (taskPool.poll() != null) {}
297 stop();
298 }
299
300
301
302
303
304 void stop() {
305 if (this.stopped) return;
306
307 this.stopped = true;
308 this.executor.shutdown();
309 }
310 }
311
312
313
314
315
316
317
318 @Override
319 public void initialize(RegionServerServices rss) throws KeeperException {
320 this.rss = rss;
321 ZooKeeperWatcher zkw = rss.getZooKeeper();
322 this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
323 MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE);
324
325 Configuration conf = rss.getConfiguration();
326 long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
327 int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT);
328
329
330 ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
331 opThreads, keepAlive);
332 this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder());
333 }
334
335 @Override
336 public String getProcedureSignature() {
337 return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE;
338 }
339
340 }