001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.net.ConnectException;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.atomic.AtomicBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
035import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
039import org.apache.hadoop.ipc.RemoteException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Runs periodically to determine if the WAL should be rolled.
046 * <p/>
047 * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
048 * there is something to do, rather than the Chore sleep time which is invariant.
049 * <p/>
050 * The {@link #scheduleFlush(String, List)} is abstract here, as sometimes we hold a region without
051 * a region server but we still want to roll its WAL.
052 * <p/>
053 * TODO: change to a pool of threads
054 */
055@InterfaceAudience.Private
056public abstract class AbstractWALRoller<T extends Abortable> extends Thread implements Closeable {
057  private static final Logger LOG = LoggerFactory.getLogger(AbstractWALRoller.class);
058
059  protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
060
061  /**
062   * Configure for the timeout of log rolling retry.
063   */
064  public static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms";
065  public static final long DEFAULT_WAL_ROLL_WAIT_TIMEOUT = 30000;
066
067  /**
068   * Configure for the max count of log rolling retry. The real retry count is also limited by the
069   * timeout of log rolling via {@link #WAL_ROLL_WAIT_TIMEOUT}
070   */
071  protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries";
072
073  protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
074  protected final T abortable;
075  // Period to roll log.
076  private final long rollPeriod;
077  private final int threadWakeFrequency;
078  // The interval to check low replication on hlog's pipeline
079  private final long checkLowReplicationInterval;
080  // Wait period for roll log
081  private final long rollWaitTimeout;
082  // Max retry for roll log
083  private final int maxRollRetry;
084
085  private volatile boolean running = true;
086
087  public void addWAL(WAL wal) {
088    // check without lock first
089    if (wals.containsKey(wal)) {
090      return;
091    }
092    // this is to avoid race between addWAL and requestRollAll.
093    synchronized (this) {
094      if (wals.putIfAbsent(wal, new RollController(wal)) == null) {
095        wal.registerWALActionsListener(new WALActionsListener() {
096          @Override
097          public void logRollRequested(WALActionsListener.RollRequestReason reason) {
098            // TODO logs will contend with each other here, replace with e.g. DelayedQueue
099            synchronized (AbstractWALRoller.this) {
100              RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal));
101              controller.requestRoll();
102              AbstractWALRoller.this.notifyAll();
103            }
104          }
105
106          @Override
107          public void postLogArchive(Path oldPath, Path newPath) throws IOException {
108            afterWALArchive(oldPath, newPath);
109          }
110        });
111      }
112    }
113  }
114
115  public void requestRollAll() {
116    synchronized (this) {
117      for (RollController controller : wals.values()) {
118        controller.requestRoll();
119      }
120      notifyAll();
121    }
122  }
123
124  protected AbstractWALRoller(String name, Configuration conf, T abortable) {
125    super(name);
126    this.abortable = abortable;
127    this.rollPeriod = conf.getLong(WAL_ROLL_PERIOD_KEY, 3600000);
128    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
129    this.checkLowReplicationInterval =
130      conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
131    this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
132    // retry rolling does not have to be the default behavior, so the default value is 0 here
133    this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0);
134  }
135
136  /**
137   * we need to check low replication in period, see HBASE-18132
138   */
139  private void checkLowReplication(long now) {
140    try {
141      for (Entry<WAL, RollController> entry : wals.entrySet()) {
142        WAL wal = entry.getKey();
143        boolean needRollAlready = entry.getValue().needsRoll(now);
144        if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
145          continue;
146        }
147        ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
148      }
149    } catch (Throwable e) {
150      LOG.warn("Failed checking low replication", e);
151    }
152  }
153
154  private void abort(String reason, Throwable cause) {
155    // close all WALs before calling abort on RS.
156    // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
157    // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
158    // is already broken.
159    for (WAL wal : wals.keySet()) {
160      // shutdown rather than close here since we are going to abort the RS and the wals need to be
161      // split when recovery
162      try {
163        wal.shutdown();
164      } catch (IOException e) {
165        LOG.warn("Failed to shutdown wal", e);
166      }
167    }
168    abortable.abort(reason, cause);
169  }
170
171  @Override
172  public void run() {
173    while (running) {
174      long now = EnvironmentEdgeManager.currentTime();
175      checkLowReplication(now);
176      synchronized (this) {
177        if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) {
178          try {
179            wait(this.threadWakeFrequency);
180          } catch (InterruptedException e) {
181            // restore the interrupt state
182            Thread.currentThread().interrupt();
183          }
184          // goto the beginning to check whether again whether we should fall through to roll
185          // several WALs, and also check whether we should quit.
186          continue;
187        }
188      }
189      try {
190        for (Iterator<Entry<WAL, RollController>> iter = wals.entrySet().iterator(); iter
191          .hasNext();) {
192          Entry<WAL, RollController> entry = iter.next();
193          WAL wal = entry.getKey();
194          RollController controller = entry.getValue();
195          if (controller.isRollRequested()) {
196            // WAL roll requested, fall through
197            LOG.debug("WAL {} roll requested", wal);
198          } else if (controller.needsPeriodicRoll(now)) {
199            // Time for periodic roll, fall through
200            LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod);
201          } else {
202            continue;
203          }
204          Map<byte[], List<byte[]>> regionsToFlush = null;
205          int nAttempts = 0;
206          long startWaiting = System.currentTimeMillis();
207          do {
208            try {
209              // Force the roll if the logroll.period is elapsed or if a roll was requested.
210              // The returned value is an collection of actual region and family names.
211              regionsToFlush = controller.rollWal(System.currentTimeMillis());
212              break;
213            } catch (IOException ioe) {
214              long waitingTime = System.currentTimeMillis() - startWaiting;
215              if (waitingTime < rollWaitTimeout && nAttempts < maxRollRetry) {
216                nAttempts++;
217                LOG.warn("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry,"
218                  + " last exception", nAttempts, waitingTime, ioe);
219                sleep(1000);
220              } else {
221                LOG.error("Roll wal failed and waiting timeout, will not retry", ioe);
222                throw ioe;
223              }
224            }
225          } while (EnvironmentEdgeManager.currentTime() - startWaiting < rollWaitTimeout);
226          if (regionsToFlush != null) {
227            for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
228              scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
229            }
230          }
231        }
232      } catch (FailedLogCloseException | ConnectException e) {
233        abort("Failed log close in log roller", e);
234      } catch (IOException ex) {
235        // Abort if we get here. We probably won't recover an IOE. HBASE-1132
236        abort("IOE in log roller",
237          ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
238      } catch (Exception ex) {
239        LOG.error("Log rolling failed", ex);
240        abort("Log rolling failed", ex);
241      }
242    }
243    LOG.info("LogRoller exiting.");
244  }
245
246  protected void afterWALArchive(Path oldPath, Path newPath) {
247  }
248
249  /**
250   * @param encodedRegionName Encoded name of region to flush.
251   * @param families          stores of region to flush.
252   */
253  protected abstract void scheduleFlush(String encodedRegionName, List<byte[]> families);
254
255  private boolean isWaiting() {
256    Thread.State state = getState();
257    return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
258  }
259
260  /** Returns true if all WAL roll finished */
261  public boolean walRollFinished() {
262    // TODO add a status field of roll in RollController
263    return wals.values().stream()
264      .noneMatch(rc -> rc.needsRoll(EnvironmentEdgeManager.currentTime())) && isWaiting();
265  }
266
267  /**
268   * Wait until all wals have been rolled after calling {@link #requestRollAll()}.
269   */
270  public void waitUntilWalRollFinished() throws InterruptedException {
271    while (!walRollFinished()) {
272      Thread.sleep(100);
273    }
274  }
275
276  @Override
277  public void close() {
278    running = false;
279    interrupt();
280  }
281
282  /**
283   * Independently control the roll of each wal. When use multiwal, can avoid all wal roll together.
284   * see HBASE-24665 for detail
285   */
286  protected class RollController {
287    private final WAL wal;
288    private final AtomicBoolean rollRequest;
289    private long lastRollTime;
290
291    RollController(WAL wal) {
292      this.wal = wal;
293      this.rollRequest = new AtomicBoolean(false);
294      this.lastRollTime = EnvironmentEdgeManager.currentTime();
295    }
296
297    public void requestRoll() {
298      this.rollRequest.set(true);
299    }
300
301    public Map<byte[], List<byte[]>> rollWal(long now) throws IOException {
302      this.lastRollTime = now;
303      // reset the flag in front to avoid missing roll request before we return from rollWriter.
304      this.rollRequest.set(false);
305      return wal.rollWriter(true);
306    }
307
308    public boolean isRollRequested() {
309      return rollRequest.get();
310    }
311
312    public boolean needsPeriodicRoll(long now) {
313      return (now - this.lastRollTime) > rollPeriod;
314    }
315
316    public boolean needsRoll(long now) {
317      return isRollRequested() || needsPeriodicRoll(now);
318    }
319  }
320}