1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.handler;
20
21 import java.io.IOException;
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.Server;
28 import org.apache.hadoop.hbase.ServerName;
29 import org.apache.hadoop.hbase.SplitLogCounters;
30 import org.apache.hadoop.hbase.SplitLogTask;
31 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
32 import org.apache.hadoop.hbase.executor.EventHandler;
33 import org.apache.hadoop.hbase.executor.EventType;
34 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
35 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
36 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
37 import org.apache.hadoop.hbase.util.CancelableProgressable;
38
39
40
41
42 @InterfaceAudience.Private
43 public class WALSplitterHandler extends EventHandler {
44 private static final Log LOG = LogFactory.getLog(WALSplitterHandler.class);
45 private final ServerName serverName;
46 private final CancelableProgressable reporter;
47 private final AtomicInteger inProgressTasks;
48 private final TaskExecutor splitTaskExecutor;
49 private final RecoveryMode mode;
50 private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails;
51 private final SplitLogWorkerCoordination coordination;
52
53
54 public WALSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
55 SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
56 AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
57 super(server, EventType.RS_LOG_REPLAY);
58 this.splitTaskDetails = splitDetails;
59 this.coordination = coordination;
60 this.reporter = reporter;
61 this.inProgressTasks = inProgressTasks;
62 this.inProgressTasks.incrementAndGet();
63 this.serverName = server.getServerName();
64 this.splitTaskExecutor = splitTaskExecutor;
65 this.mode = mode;
66 }
67
68 @Override
69 public void process() throws IOException {
70 long startTime = System.currentTimeMillis();
71 try {
72 Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
73 switch (status) {
74 case DONE:
75 coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
76 SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
77 break;
78 case PREEMPTED:
79 SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
80 LOG.warn("task execution prempted " + splitTaskDetails.getWALFile());
81 break;
82 case ERR:
83 if (server != null && !server.isStopped()) {
84 coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
85 SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
86 break;
87 }
88
89
90
91 case RESIGNED:
92 if (server != null && server.isStopped()) {
93 LOG.info("task execution interrupted because worker is exiting "
94 + splitTaskDetails.toString());
95 }
96 coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
97 SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
98 break;
99 }
100 } finally {
101 LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in "
102 + (System.currentTimeMillis() - startTime) + "ms");
103 this.inProgressTasks.decrementAndGet();
104 }
105 }
106 }