1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.Map.Entry;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.locks.ReentrantLock;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.RemoteExceptionHandler;
32 import org.apache.hadoop.hbase.Server;
33 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
34 import org.apache.hadoop.hbase.wal.WAL;
35 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.HasThread;
38
39 import java.io.IOException;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.locks.ReentrantLock;
42
43 import com.google.common.annotations.VisibleForTesting;
44
45
46
47
48
49
50
51
52
53
54 @InterfaceAudience.Private
55 @VisibleForTesting
56 public class LogRoller extends HasThread {
57 static final Log LOG = LogFactory.getLog(LogRoller.class);
58 private final ReentrantLock rollLock = new ReentrantLock();
59 private final AtomicBoolean rollLog = new AtomicBoolean(false);
60 private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
61 new ConcurrentHashMap<WAL, Boolean>();
62 private final Server server;
63 protected final RegionServerServices services;
64 private volatile long lastrolltime = System.currentTimeMillis();
65
66 private final long rollperiod;
67 private final int threadWakeFrequency;
68
69 public void addWAL(final WAL wal) {
70 if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
71 wal.registerWALActionsListener(new WALActionsListener.Base() {
72 @Override
73 public void logRollRequested(boolean lowReplicas) {
74 walNeedsRoll.put(wal, Boolean.TRUE);
75
76 synchronized(rollLog) {
77 rollLog.set(true);
78 rollLog.notifyAll();
79 }
80 }
81 });
82 }
83 }
84
85 public void requestRollAll() {
86 for (WAL wal : walNeedsRoll.keySet()) {
87 walNeedsRoll.put(wal, Boolean.TRUE);
88 }
89 synchronized(rollLog) {
90 rollLog.set(true);
91 rollLog.notifyAll();
92 }
93 }
94
95
96 public LogRoller(final Server server, final RegionServerServices services) {
97 super();
98 this.server = server;
99 this.services = services;
100 this.rollperiod = this.server.getConfiguration().
101 getLong("hbase.regionserver.logroll.period", 3600000);
102 this.threadWakeFrequency = this.server.getConfiguration().
103 getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
104 }
105
106 @Override
107 public void interrupt() {
108
109 synchronized (rollLog) {
110 this.rollLog.notify();
111 }
112 super.interrupt();
113 }
114
115 @Override
116 public void run() {
117 while (!server.isStopped()) {
118 long now = System.currentTimeMillis();
119 boolean periodic = false;
120 if (!rollLog.get()) {
121 periodic = (now - this.lastrolltime) > this.rollperiod;
122 if (!periodic) {
123 synchronized (rollLog) {
124 try {
125 if (!rollLog.get()) {
126 rollLog.wait(this.threadWakeFrequency);
127 }
128 } catch (InterruptedException e) {
129
130 }
131 }
132 continue;
133 }
134
135 if (LOG.isDebugEnabled()) {
136 LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
137 }
138 } else if (LOG.isDebugEnabled()) {
139 LOG.debug("WAL roll requested");
140 }
141 rollLock.lock();
142 try {
143 this.lastrolltime = now;
144 for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
145 final WAL wal = entry.getKey();
146
147
148 final byte [][] regionsToFlush = wal.rollWriter(periodic ||
149 entry.getValue().booleanValue());
150 walNeedsRoll.put(wal, Boolean.FALSE);
151 if (regionsToFlush != null) {
152 for (byte [] r: regionsToFlush) scheduleFlush(r);
153 }
154 }
155 } catch (FailedLogCloseException e) {
156 server.abort("Failed log close in log roller", e);
157 } catch (java.net.ConnectException e) {
158 server.abort("Failed log close in log roller", e);
159 } catch (IOException ex) {
160 LOG.fatal("Aborting", ex);
161
162 server.abort("IOE in log roller",
163 RemoteExceptionHandler.checkIOException(ex));
164 } catch (Exception ex) {
165 final String msg = "Failed rolling WAL; aborting to recover edits!";
166 LOG.error(msg, ex);
167 server.abort(msg, ex);
168 } finally {
169 try {
170 rollLog.set(false);
171 } finally {
172 rollLock.unlock();
173 }
174 }
175 }
176 LOG.info("LogRoller exiting.");
177 }
178
179
180
181
182 private void scheduleFlush(final byte [] encodedRegionName) {
183 boolean scheduled = false;
184 Region r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName));
185 FlushRequester requester = null;
186 if (r != null) {
187 requester = this.services.getFlushRequester();
188 if (requester != null) {
189
190 requester.requestFlush(r, true);
191 scheduled = true;
192 }
193 }
194 if (!scheduled) {
195 LOG.warn("Failed to schedule flush of " +
196 Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
197 requester);
198 }
199 }
200 }