1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.snapshot;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorCompletionService;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.classification.InterfaceStability;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.Abortable;
39 import org.apache.hadoop.hbase.DaemonThreadFactory;
40 import org.apache.hadoop.hbase.DroppedSnapshotException;
41 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
44 import org.apache.hadoop.hbase.errorhandling.ForeignException;
45 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
46 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
47 import org.apache.hadoop.hbase.procedure.ProcedureMember;
48 import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
49 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
50 import org.apache.hadoop.hbase.procedure.Subprocedure;
51 import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
52 import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
53 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
54 import org.apache.hadoop.hbase.regionserver.HRegionServer;
55 import org.apache.hadoop.hbase.regionserver.Region;
56 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
57 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
58 import org.apache.zookeeper.KeeperException;
59
60 import com.google.protobuf.InvalidProtocolBufferException;
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
76 @InterfaceStability.Unstable
77 public class RegionServerSnapshotManager extends RegionServerProcedureManager {
78 private static final Log LOG = LogFactory.getLog(RegionServerSnapshotManager.class);
79
80
81 private static final String CONCURENT_SNAPSHOT_TASKS_KEY = "hbase.snapshot.region.concurrentTasks";
82 private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3;
83
84
85 public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads";
86
87 public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10;
88
89
90 public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout";
91
92 public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5 * 60000;
93
94
95 public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY = "hbase.snapshot.region.wakefrequency";
96
97 private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500;
98
99 private RegionServerServices rss;
100 private ProcedureMemberRpcs memberRpcs;
101 private ProcedureMember member;
102
103
104
105
106
107
108
109
110 RegionServerSnapshotManager(Configuration conf, HRegionServer parent,
111 ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
112 this.rss = parent;
113 this.memberRpcs = memberRpc;
114 this.member = procMember;
115 }
116
117 public RegionServerSnapshotManager() {}
118
119
120
121
122 @Override
123 public void start() {
124 LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
125 this.memberRpcs.start(rss.getServerName().toString(), member);
126 }
127
128
129
130
131
132
133 @Override
134 public void stop(boolean force) throws IOException {
135 String mode = force ? "abruptly" : "gracefully";
136 LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
137
138 try {
139 this.member.close();
140 } finally {
141 this.memberRpcs.close();
142 }
143 }
144
145
146
147
148
149
150
151
152
153
154
155 public Subprocedure buildSubprocedure(SnapshotDescription snapshot) {
156
157
158 if (rss.isStopping() || rss.isStopped()) {
159 throw new IllegalStateException("Can't start snapshot on RS: " + rss.getServerName()
160 + ", because stopping/stopped!");
161 }
162
163
164
165 List<Region> involvedRegions;
166 try {
167 involvedRegions = getRegionsToSnapshot(snapshot);
168 } catch (IOException e1) {
169 throw new IllegalStateException("Failed to figure out if we should handle a snapshot - "
170 + "something has gone awry with the online regions.", e1);
171 }
172
173
174
175
176
177 LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table "
178 + snapshot.getTable() + " type " + snapshot.getType());
179 ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName());
180 Configuration conf = rss.getConfiguration();
181 long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
182 SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
183 long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY,
184 SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
185
186 switch (snapshot.getType()) {
187 case FLUSH:
188 SnapshotSubprocedurePool taskManager =
189 new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
190 return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
191 timeoutMillis, involvedRegions, snapshot, taskManager);
192 case SKIPFLUSH:
193
194
195
196
197
198
199
200 SnapshotSubprocedurePool taskManager2 =
201 new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
202 return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
203 timeoutMillis, involvedRegions, snapshot, taskManager2);
204
205 default:
206 throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType());
207 }
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225 private List<Region> getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException {
226 List<Region> onlineRegions = rss.getOnlineRegions(TableName.valueOf(snapshot.getTable()));
227 Iterator<Region> iterator = onlineRegions.iterator();
228
229 while (iterator.hasNext()) {
230 Region r = iterator.next();
231 if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
232 iterator.remove();
233 }
234 }
235 return onlineRegions;
236 }
237
238
239
240
241 public class SnapshotSubprocedureBuilder implements SubprocedureFactory {
242
243 @Override
244 public Subprocedure buildSubprocedure(String name, byte[] data) {
245 try {
246
247 SnapshotDescription snapshot = SnapshotDescription.parseFrom(data);
248 return RegionServerSnapshotManager.this.buildSubprocedure(snapshot);
249 } catch (InvalidProtocolBufferException e) {
250 throw new IllegalArgumentException("Could not read snapshot information from request.");
251 }
252 }
253
254 }
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269 static class SnapshotSubprocedurePool {
270 private final Abortable abortable;
271 private final ExecutorCompletionService<Void> taskPool;
272 private final ThreadPoolExecutor executor;
273 private volatile boolean stopped;
274 private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
275 private final String name;
276
277 SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
278 this.abortable = abortable;
279
280 long keepAlive = conf.getLong(
281 RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
282 RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
283 int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
284 this.name = name;
285 executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS,
286 new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs("
287 + name + ")-snapshot-pool"));
288 taskPool = new ExecutorCompletionService<Void>(executor);
289 }
290
291 boolean hasTasks() {
292 return futures.size() != 0;
293 }
294
295
296
297
298
299
300
301 void submitTask(final Callable<Void> task) {
302 Future<Void> f = this.taskPool.submit(task);
303 futures.add(f);
304 }
305
306
307
308
309
310
311
312
313
314 boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
315 LOG.debug("Waiting for local region snapshots to finish.");
316
317 int sz = futures.size();
318 try {
319
320 for (int i = 0; i < sz; i++) {
321 Future<Void> f = taskPool.take();
322 f.get();
323 if (!futures.remove(f)) {
324 LOG.warn("unexpected future" + f);
325 }
326 LOG.debug("Completed " + (i+1) + "/" + sz + " local region snapshots.");
327 }
328 LOG.debug("Completed " + sz + " local region snapshots.");
329 return true;
330 } catch (InterruptedException e) {
331 LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e);
332 if (!stopped) {
333 Thread.currentThread().interrupt();
334 throw new ForeignException("SnapshotSubprocedurePool", e);
335 }
336
337 } catch (ExecutionException e) {
338 Throwable cause = e.getCause();
339 if (cause instanceof ForeignException) {
340 LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
341 throw (ForeignException)e.getCause();
342 } else if (cause instanceof DroppedSnapshotException) {
343
344 abortable.abort("Received DroppedSnapshotException, aborting", cause);
345 }
346 LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
347 throw new ForeignException(name, e.getCause());
348 } finally {
349 cancelTasks();
350 }
351 return false;
352 }
353
354
355
356
357
358 void cancelTasks() throws InterruptedException {
359 Collection<Future<Void>> tasks = futures;
360 LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name);
361 for (Future<Void> f: tasks) {
362
363
364
365
366 f.cancel(false);
367 }
368
369
370 futures.clear();
371 while (taskPool.poll() != null) {}
372 stop();
373 }
374
375
376
377
378 void stop() {
379 if (this.stopped) return;
380
381 this.stopped = true;
382 this.executor.shutdown();
383 }
384 }
385
386
387
388
389
390
391 @Override
392 public void initialize(RegionServerServices rss) throws KeeperException {
393 this.rss = rss;
394 ZooKeeperWatcher zkw = rss.getZooKeeper();
395 this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
396 SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
397
398
399 Configuration conf = rss.getConfiguration();
400 long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
401 int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
402
403
404 ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
405 opThreads, keepAlive);
406 this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
407 }
408
409 @Override
410 public String getProcedureSignature() {
411 return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
412 }
413
414 }