001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.net.ConnectException;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.atomic.AtomicBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
035import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
037import org.apache.hadoop.hbase.regionserver.wal.WALClosedException;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.ipc.RemoteException;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Runs periodically to determine if the WAL should be rolled.
047 * <p/>
048 * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
049 * there is something to do, rather than the Chore sleep time which is invariant.
050 * <p/>
051 * The {@link #scheduleFlush(String, List)} is abstract here, as sometimes we hold a region without
052 * a region server but we still want to roll its WAL.
053 * <p/>
054 * TODO: change to a pool of threads
055 */
056@InterfaceAudience.Private
057public abstract class AbstractWALRoller<T extends Abortable> extends Thread implements Closeable {
058  private static final Logger LOG = LoggerFactory.getLogger(AbstractWALRoller.class);
059
060  protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
061
062  /**
063   * Configure for the timeout of log rolling retry.
064   */
065  public static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms";
066  public static final long DEFAULT_WAL_ROLL_WAIT_TIMEOUT = 30000;
067
068  /**
069   * Configure for the max count of log rolling retry. The real retry count is also limited by the
070   * timeout of log rolling via {@link #WAL_ROLL_WAIT_TIMEOUT}
071   */
072  protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries";
073
074  protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
075  protected final T abortable;
076  // Period to roll log.
077  private final long rollPeriod;
078  private final int threadWakeFrequency;
079  // The interval to check low replication on hlog's pipeline
080  private final long checkLowReplicationInterval;
081  // Wait period for roll log
082  private final long rollWaitTimeout;
083  // Max retry for roll log
084  private final int maxRollRetry;
085
086  private volatile boolean running = true;
087
088  public void addWAL(WAL wal) {
089    // check without lock first
090    if (wals.containsKey(wal)) {
091      return;
092    }
093    // this is to avoid race between addWAL and requestRollAll.
094    synchronized (this) {
095      if (wals.putIfAbsent(wal, new RollController(wal)) == null) {
096        wal.registerWALActionsListener(new WALActionsListener() {
097          @Override
098          public void logRollRequested(WALActionsListener.RollRequestReason reason) {
099            // TODO logs will contend with each other here, replace with e.g. DelayedQueue
100            synchronized (AbstractWALRoller.this) {
101              RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal));
102              controller.requestRoll();
103              AbstractWALRoller.this.notifyAll();
104            }
105          }
106
107          @Override
108          public void postLogArchive(Path oldPath, Path newPath) throws IOException {
109            afterWALArchive(oldPath, newPath);
110          }
111        });
112      }
113    }
114  }
115
116  public void requestRollAll() {
117    synchronized (this) {
118      for (RollController controller : wals.values()) {
119        controller.requestRoll();
120      }
121      notifyAll();
122    }
123  }
124
125  protected AbstractWALRoller(String name, Configuration conf, T abortable) {
126    super(name);
127    this.abortable = abortable;
128    this.rollPeriod = conf.getLong(WAL_ROLL_PERIOD_KEY, 3600000);
129    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
130    this.checkLowReplicationInterval =
131      conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
132    this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, DEFAULT_WAL_ROLL_WAIT_TIMEOUT);
133    // retry rolling does not have to be the default behavior, so the default value is 0 here
134    this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0);
135  }
136
137  /**
138   * we need to check low replication in period, see HBASE-18132
139   */
140  private void checkLowReplication(long now) {
141    try {
142      for (Entry<WAL, RollController> entry : wals.entrySet()) {
143        WAL wal = entry.getKey();
144        boolean needRollAlready = entry.getValue().needsRoll(now);
145        if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
146          continue;
147        }
148        ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
149      }
150    } catch (Throwable e) {
151      LOG.warn("Failed checking low replication", e);
152    }
153  }
154
155  private void abort(String reason, Throwable cause) {
156    // close all WALs before calling abort on RS.
157    // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
158    // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
159    // is already broken.
160    for (WAL wal : wals.keySet()) {
161      // shutdown rather than close here since we are going to abort the RS and the wals need to be
162      // split when recovery
163      try {
164        wal.shutdown();
165      } catch (IOException e) {
166        LOG.warn("Failed to shutdown wal", e);
167      }
168    }
169    abortable.abort(reason, cause);
170  }
171
172  @Override
173  public void run() {
174    while (running) {
175      long now = EnvironmentEdgeManager.currentTime();
176      checkLowReplication(now);
177      synchronized (this) {
178        if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) {
179          try {
180            wait(this.threadWakeFrequency);
181          } catch (InterruptedException e) {
182            // restore the interrupt state
183            Thread.currentThread().interrupt();
184          }
185          // goto the beginning to check whether again whether we should fall through to roll
186          // several WALs, and also check whether we should quit.
187          continue;
188        }
189      }
190      try {
191        for (Iterator<Entry<WAL, RollController>> iter = wals.entrySet().iterator(); iter
192          .hasNext();) {
193          Entry<WAL, RollController> entry = iter.next();
194          WAL wal = entry.getKey();
195          RollController controller = entry.getValue();
196          if (controller.isRollRequested()) {
197            // WAL roll requested, fall through
198            LOG.debug("WAL {} roll requested", wal);
199          } else if (controller.needsPeriodicRoll(now)) {
200            // Time for periodic roll, fall through
201            LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod);
202          } else {
203            continue;
204          }
205          Map<byte[], List<byte[]>> regionsToFlush = null;
206          int nAttempts = 0;
207          long startWaiting = EnvironmentEdgeManager.currentTime();
208          do {
209            try {
210              // Force the roll if the logroll.period is elapsed or if a roll was requested.
211              // The returned value is an collection of actual region and family names.
212              regionsToFlush = controller.rollWal(EnvironmentEdgeManager.currentTime());
213              break;
214            } catch (IOException ioe) {
215              if (ioe instanceof WALClosedException) {
216                LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", ioe);
217                iter.remove();
218                break;
219              }
220              long waitingTime = EnvironmentEdgeManager.currentTime() - startWaiting;
221              if (waitingTime < rollWaitTimeout && nAttempts < maxRollRetry) {
222                nAttempts++;
223                LOG.warn("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry,"
224                  + " last exception", nAttempts, waitingTime, ioe);
225                sleep(1000);
226              } else {
227                LOG.error("Roll wal failed and waiting timeout, will not retry", ioe);
228                throw ioe;
229              }
230            }
231          } while (EnvironmentEdgeManager.currentTime() - startWaiting < rollWaitTimeout);
232          if (regionsToFlush != null) {
233            for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
234              scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
235            }
236          }
237        }
238      } catch (FailedLogCloseException | ConnectException e) {
239        abort("Failed log close in log roller", e);
240      } catch (IOException ex) {
241        // Abort if we get here. We probably won't recover an IOE. HBASE-1132
242        abort("IOE in log roller",
243          ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
244      } catch (Exception ex) {
245        LOG.error("Log rolling failed", ex);
246        abort("Log rolling failed", ex);
247      }
248    }
249    LOG.info("LogRoller exiting.");
250  }
251
252  protected void afterWALArchive(Path oldPath, Path newPath) {
253  }
254
255  /**
256   * @param encodedRegionName Encoded name of region to flush.
257   * @param families          stores of region to flush.
258   */
259  protected abstract void scheduleFlush(String encodedRegionName, List<byte[]> families);
260
261  private boolean isWaiting() {
262    Thread.State state = getState();
263    return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
264  }
265
266  /** Returns true if all WAL roll finished */
267  public boolean walRollFinished() {
268    // TODO add a status field of roll in RollController
269    return wals.values().stream()
270      .noneMatch(rc -> rc.needsRoll(EnvironmentEdgeManager.currentTime())) && isWaiting();
271  }
272
273  /**
274   * Wait until all wals have been rolled after calling {@link #requestRollAll()}.
275   */
276  public void waitUntilWalRollFinished() throws InterruptedException {
277    while (!walRollFinished()) {
278      Thread.sleep(100);
279    }
280  }
281
282  @Override
283  public void close() {
284    running = false;
285    interrupt();
286  }
287
288  /**
289   * Independently control the roll of each wal. When use multiwal, can avoid all wal roll together.
290   * see HBASE-24665 for detail
291   */
292  protected class RollController {
293    private final WAL wal;
294    private final AtomicBoolean rollRequest;
295    private long lastRollTime;
296
297    RollController(WAL wal) {
298      this.wal = wal;
299      this.rollRequest = new AtomicBoolean(false);
300      this.lastRollTime = EnvironmentEdgeManager.currentTime();
301    }
302
303    public void requestRoll() {
304      this.rollRequest.set(true);
305    }
306
307    public Map<byte[], List<byte[]>> rollWal(long now) throws IOException {
308      this.lastRollTime = now;
309      // reset the flag in front to avoid missing roll request before we return from rollWriter.
310      this.rollRequest.set(false);
311      return wal.rollWriter(true);
312    }
313
314    public boolean isRollRequested() {
315      return rollRequest.get();
316    }
317
318    public boolean needsPeriodicRoll(long now) {
319      return (now - this.lastRollTime) > rollPeriod;
320    }
321
322    public boolean needsRoll(long now) {
323      return isRollRequested() || needsPeriodicRoll(now);
324    }
325  }
326}