1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.ConnectException;
24 import java.net.SocketTimeoutException;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.NotServingRegionException;
33 import org.apache.hadoop.hbase.Server;
34 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
35 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
36 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
37 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
38 import org.apache.hadoop.hbase.wal.WALFactory;
39 import org.apache.hadoop.hbase.wal.WALSplitter;
40 import org.apache.hadoop.hbase.util.CancelableProgressable;
41 import org.apache.hadoop.hbase.util.ExceptionUtil;
42 import org.apache.hadoop.hbase.util.FSUtils;
43
44 import com.google.common.annotations.VisibleForTesting;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 @InterfaceAudience.Private
64 public class SplitLogWorker implements Runnable {
65
66 private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
67
68 Thread worker;
69
70 private SplitLogWorkerCoordination coordination;
71 private Configuration conf;
72 private RegionServerServices server;
73
74 public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
75 TaskExecutor splitTaskExecutor) {
76 this.server = server;
77 this.conf = conf;
78 this.coordination =
79 ((BaseCoordinatedStateManager) hserver.getCoordinatedStateManager())
80 .getSplitLogWorkerCoordination();
81 this.server = server;
82 coordination.init(server, conf, splitTaskExecutor, this);
83 }
84
85 public SplitLogWorker(final Server hserver, final Configuration conf,
86 final RegionServerServices server, final LastSequenceId sequenceIdChecker,
87 final WALFactory factory) {
88 this(server, conf, server, new TaskExecutor() {
89 @Override
90 public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
91 Path rootdir;
92 FileSystem fs;
93 try {
94 rootdir = FSUtils.getRootDir(conf);
95 fs = rootdir.getFileSystem(conf);
96 } catch (IOException e) {
97 LOG.warn("could not find root dir or fs", e);
98 return Status.RESIGNED;
99 }
100
101
102
103 try {
104 if (!WALSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
105 fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode, factory)) {
106 return Status.PREEMPTED;
107 }
108 } catch (InterruptedIOException iioe) {
109 LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
110 return Status.RESIGNED;
111 } catch (IOException e) {
112 Throwable cause = e.getCause();
113 if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
114 || cause instanceof ConnectException
115 || cause instanceof SocketTimeoutException)) {
116 LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
117 + "resigning", e);
118 return Status.RESIGNED;
119 } else if (cause instanceof InterruptedException) {
120 LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
121 return Status.RESIGNED;
122 }
123 LOG.warn("log splitting of " + filename + " failed, returning error", e);
124 return Status.ERR;
125 }
126 return Status.DONE;
127 }
128 });
129 }
130
131 @Override
132 public void run() {
133 try {
134 LOG.info("SplitLogWorker " + server.getServerName() + " starting");
135 coordination.registerListener();
136
137 boolean res = false;
138 while (!res && !coordination.isStop()) {
139 res = coordination.isReady();
140 }
141 if (!coordination.isStop()) {
142 coordination.taskLoop();
143 }
144 } catch (Throwable t) {
145 if (ExceptionUtil.isInterrupt(t)) {
146 LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" :
147 " (ERROR: exitWorker is not set, exiting anyway)"));
148 } else {
149
150
151 LOG.error("unexpected error ", t);
152 }
153 } finally {
154 coordination.removeListener();
155 LOG.info("SplitLogWorker " + server.getServerName() + " exiting");
156 }
157 }
158
159
160
161
162
163 public void stopTask() {
164 LOG.info("Sending interrupt to stop the worker thread");
165 worker.interrupt();
166 }
167
168
169
170
171 public void start() {
172 worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName().toShortString());
173 worker.start();
174 }
175
176
177
178
179 public void stop() {
180 coordination.stopProcessingTasks();
181 stopTask();
182 }
183
184
185
186
187
188
189
190
191
192 public interface TaskExecutor {
193 enum Status {
194 DONE(),
195 ERR(),
196 RESIGNED(),
197 PREEMPTED()
198 }
199 Status exec(String name, RecoveryMode mode, CancelableProgressable p);
200 }
201
202
203
204
205
206 @VisibleForTesting
207 public int getTaskReadySeq() {
208 return coordination.getTaskReadySeq();
209 }
210 }