1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.net.ConnectException;
25 import java.net.SocketTimeoutException;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.NotServingRegionException;
34 import org.apache.hadoop.hbase.Server;
35 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
36 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
37 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
38 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
39 import org.apache.hadoop.hbase.wal.WALFactory;
40 import org.apache.hadoop.hbase.wal.WALSplitter;
41 import org.apache.hadoop.hbase.util.CancelableProgressable;
42 import org.apache.hadoop.hbase.util.ExceptionUtil;
43 import org.apache.hadoop.hbase.util.FSUtils;
44
45 import com.google.common.annotations.VisibleForTesting;
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 @InterfaceAudience.Private
65 public class SplitLogWorker implements Runnable {
66
67 private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
68
69 Thread worker;
70
71 private SplitLogWorkerCoordination coordination;
72 private Configuration conf;
73 private RegionServerServices server;
74
75 public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
76 TaskExecutor splitTaskExecutor) {
77 this.server = server;
78 this.conf = conf;
79 this.coordination =
80 ((BaseCoordinatedStateManager) hserver.getCoordinatedStateManager())
81 .getSplitLogWorkerCoordination();
82 this.server = server;
83 coordination.init(server, conf, splitTaskExecutor, this);
84 }
85
86 public SplitLogWorker(final Server hserver, final Configuration conf,
87 final RegionServerServices server, final LastSequenceId sequenceIdChecker,
88 final WALFactory factory) {
89 this(server, conf, server, new TaskExecutor() {
90 @Override
91 public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
92 Path rootdir;
93 FileSystem fs;
94 try {
95 rootdir = FSUtils.getRootDir(conf);
96 fs = rootdir.getFileSystem(conf);
97 } catch (IOException e) {
98 LOG.warn("could not find root dir or fs", e);
99 return Status.RESIGNED;
100 }
101
102
103
104 try {
105 if (!WALSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
106 fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode, factory)) {
107 return Status.PREEMPTED;
108 }
109 } catch (InterruptedIOException iioe) {
110 LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
111 return Status.RESIGNED;
112 } catch (IOException e) {
113 if (e instanceof FileNotFoundException) {
114
115 LOG.warn("WAL " + filename + " does not exist anymore", e);
116 return Status.DONE;
117 }
118 Throwable cause = e.getCause();
119 if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
120 || cause instanceof ConnectException
121 || cause instanceof SocketTimeoutException)) {
122 LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
123 + "resigning", e);
124 return Status.RESIGNED;
125 } else if (cause instanceof InterruptedException) {
126 LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
127 return Status.RESIGNED;
128 }
129 LOG.warn("log splitting of " + filename + " failed, returning error", e);
130 return Status.ERR;
131 }
132 return Status.DONE;
133 }
134 });
135 }
136
137 @Override
138 public void run() {
139 try {
140 LOG.info("SplitLogWorker " + server.getServerName() + " starting");
141 coordination.registerListener();
142
143 boolean res = false;
144 while (!res && !coordination.isStop()) {
145 res = coordination.isReady();
146 }
147 if (!coordination.isStop()) {
148 coordination.taskLoop();
149 }
150 } catch (Throwable t) {
151 if (ExceptionUtil.isInterrupt(t)) {
152 LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" :
153 " (ERROR: exitWorker is not set, exiting anyway)"));
154 } else {
155
156
157 LOG.error("unexpected error ", t);
158 }
159 } finally {
160 coordination.removeListener();
161 LOG.info("SplitLogWorker " + server.getServerName() + " exiting");
162 }
163 }
164
165
166
167
168
169 public void stopTask() {
170 LOG.info("Sending interrupt to stop the worker thread");
171 worker.interrupt();
172 }
173
174
175
176
177 public void start() {
178 worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName().toShortString());
179 worker.start();
180 }
181
182
183
184
185 public void stop() {
186 coordination.stopProcessingTasks();
187 stopTask();
188 }
189
190
191
192
193
194
195
196
197
198 public interface TaskExecutor {
199 enum Status {
200 DONE(),
201 ERR(),
202 RESIGNED(),
203 PREEMPTED()
204 }
205 Status exec(String name, RecoveryMode mode, CancelableProgressable p);
206 }
207
208
209
210
211
212 @VisibleForTesting
213 public int getTaskReadySeq() {
214 return coordination.getTaskReadySeq();
215 }
216 }