1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.Map.Entry;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.locks.ReentrantLock;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.RemoteExceptionHandler;
32 import org.apache.hadoop.hbase.Server;
33 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
34 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
35 import org.apache.hadoop.hbase.wal.WAL;
36 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.util.HasThread;
39
40 import java.io.IOException;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.locks.ReentrantLock;
43
44 import com.google.common.annotations.VisibleForTesting;
45
46
47
48
49
50
51
52
53
54
55 @InterfaceAudience.Private
56 @VisibleForTesting
57 public class LogRoller extends HasThread {
58 private static final Log LOG = LogFactory.getLog(LogRoller.class);
59 private final ReentrantLock rollLock = new ReentrantLock();
60 private final AtomicBoolean rollLog = new AtomicBoolean(false);
61 private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
62 new ConcurrentHashMap<WAL, Boolean>();
63 private final Server server;
64 protected final RegionServerServices services;
65 private volatile long lastrolltime = System.currentTimeMillis();
66
67 private final long rollperiod;
68 private final int threadWakeFrequency;
69
70 private long checkLowReplicationInterval;
71
72 public void addWAL(final WAL wal) {
73 if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
74 wal.registerWALActionsListener(new WALActionsListener.Base() {
75 @Override
76 public void logRollRequested(boolean lowReplicas) {
77 walNeedsRoll.put(wal, Boolean.TRUE);
78
79 synchronized(rollLog) {
80 rollLog.set(true);
81 rollLog.notifyAll();
82 }
83 }
84 });
85 }
86 }
87
88 public void requestRollAll() {
89 for (WAL wal : walNeedsRoll.keySet()) {
90 walNeedsRoll.put(wal, Boolean.TRUE);
91 }
92 synchronized(rollLog) {
93 rollLog.set(true);
94 rollLog.notifyAll();
95 }
96 }
97
98
99 public LogRoller(final Server server, final RegionServerServices services) {
100 super("LogRoller");
101 this.server = server;
102 this.services = services;
103 this.rollperiod = this.server.getConfiguration().
104 getLong("hbase.regionserver.logroll.period", 3600000);
105 this.threadWakeFrequency = this.server.getConfiguration().
106 getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
107 this.checkLowReplicationInterval = this.server.getConfiguration().getLong(
108 "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
109 }
110
111 @Override
112 public void interrupt() {
113
114 synchronized (rollLog) {
115 this.rollLog.notify();
116 }
117 super.interrupt();
118 }
119
120
121
122
123 void checkLowReplication(long now) {
124 try {
125 for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
126 WAL wal = entry.getKey();
127 boolean neeRollAlready = entry.getValue();
128 if(wal instanceof FSHLog && !neeRollAlready) {
129 FSHLog hlog = (FSHLog)wal;
130 if ((now - hlog.getLastTimeCheckLowReplication())
131 > this.checkLowReplicationInterval) {
132 hlog.checkLogRoll();
133 }
134 }
135 }
136 } catch (Throwable e) {
137 LOG.warn("Failed checking low replication", e);
138 }
139 }
140
141 @Override
142 public void run() {
143 while (!server.isStopped()) {
144 long now = System.currentTimeMillis();
145 checkLowReplication(now);
146 boolean periodic = false;
147 if (!rollLog.get()) {
148 periodic = (now - this.lastrolltime) > this.rollperiod;
149 if (!periodic) {
150 synchronized (rollLog) {
151 try {
152 if (!rollLog.get()) {
153 rollLog.wait(this.threadWakeFrequency);
154 }
155 } catch (InterruptedException e) {
156
157 }
158 }
159 continue;
160 }
161
162 if (LOG.isDebugEnabled()) {
163 LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
164 }
165 } else if (LOG.isDebugEnabled()) {
166 LOG.debug("WAL roll requested");
167 }
168 rollLock.lock();
169 try {
170 this.lastrolltime = now;
171 for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
172 final WAL wal = entry.getKey();
173
174
175 final byte [][] regionsToFlush = wal.rollWriter(periodic ||
176 entry.getValue().booleanValue());
177 walNeedsRoll.put(wal, Boolean.FALSE);
178 if (regionsToFlush != null) {
179 for (byte [] r: regionsToFlush) scheduleFlush(r);
180 }
181 }
182 } catch (FailedLogCloseException e) {
183 server.abort("Failed log close in log roller", e);
184 } catch (java.net.ConnectException e) {
185 server.abort("Failed log close in log roller", e);
186 } catch (IOException ex) {
187 LOG.fatal("Aborting", ex);
188
189 server.abort("IOE in log roller",
190 RemoteExceptionHandler.checkIOException(ex));
191 } catch (Exception ex) {
192 final String msg = "Failed rolling WAL; aborting to recover edits!";
193 LOG.error(msg, ex);
194 server.abort(msg, ex);
195 } finally {
196 try {
197 rollLog.set(false);
198 } finally {
199 rollLock.unlock();
200 }
201 }
202 }
203 LOG.info("LogRoller exiting.");
204 }
205
206
207
208
209 private void scheduleFlush(final byte [] encodedRegionName) {
210 boolean scheduled = false;
211 Region r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName));
212 FlushRequester requester = null;
213 if (r != null) {
214 requester = this.services.getFlushRequester();
215 if (requester != null) {
216
217 requester.requestFlush(r, true);
218 scheduled = true;
219 }
220 }
221 if (!scheduled) {
222 LOG.warn("Failed to schedule flush of " +
223 Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
224 requester);
225 }
226 }
227
228
229
230
231
232 @VisibleForTesting
233 public boolean walRollFinished() {
234 for (boolean needRoll : walNeedsRoll.values()) {
235 if (needRoll) {
236 return false;
237 }
238 }
239 return true;
240 }
241 }