001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.net.ConnectException;
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map.Entry;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Abortable;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
033import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
034import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.ipc.RemoteException;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Runs periodically to determine if the WAL should be rolled.
043 * <p/>
044 * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
045 * there is something to do, rather than the Chore sleep time which is invariant.
046 * <p/>
047 * The {@link #scheduleFlush(String)} is abstract here, as sometimes we hold a region without a
048 * region server but we still want to roll its WAL.
049 * <p/>
050 * TODO: change to a pool of threads
051 */
052@InterfaceAudience.Private
053public abstract class AbstractWALRoller<T extends Abortable> extends Thread
054  implements Closeable {
055  private static final Logger LOG = LoggerFactory.getLogger(AbstractWALRoller.class);
056
057  protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
058
059  protected final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
060  protected final T abortable;
061  private volatile long lastRollTime = System.currentTimeMillis();
062  // Period to roll log.
063  private final long rollPeriod;
064  private final int threadWakeFrequency;
065  // The interval to check low replication on hlog's pipeline
066  private long checkLowReplicationInterval;
067
068  private volatile boolean running = true;
069
070  public void addWAL(WAL wal) {
071    // check without lock first
072    if (walNeedsRoll.containsKey(wal)) {
073      return;
074    }
075    // this is to avoid race between addWAL and requestRollAll.
076    synchronized (this) {
077      if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
078        wal.registerWALActionsListener(new WALActionsListener() {
079          @Override
080          public void logRollRequested(WALActionsListener.RollRequestReason reason) {
081            // TODO logs will contend with each other here, replace with e.g. DelayedQueue
082            synchronized (AbstractWALRoller.this) {
083              walNeedsRoll.put(wal, Boolean.TRUE);
084              AbstractWALRoller.this.notifyAll();
085            }
086          }
087        });
088      }
089    }
090  }
091
092  public void requestRollAll() {
093    synchronized (this) {
094      List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
095      for (WAL wal : wals) {
096        walNeedsRoll.put(wal, Boolean.TRUE);
097      }
098      notifyAll();
099    }
100  }
101
102  protected AbstractWALRoller(String name, Configuration conf, T abortable) {
103    super(name);
104    this.abortable = abortable;
105    this.rollPeriod = conf.getLong(WAL_ROLL_PERIOD_KEY, 3600000);
106    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
107    this.checkLowReplicationInterval =
108      conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
109  }
110
111  /**
112   * we need to check low replication in period, see HBASE-18132
113   */
114  private void checkLowReplication(long now) {
115    try {
116      for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
117        WAL wal = entry.getKey();
118        boolean needRollAlready = entry.getValue();
119        if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
120          continue;
121        }
122        ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
123      }
124    } catch (Throwable e) {
125      LOG.warn("Failed checking low replication", e);
126    }
127  }
128
129  private void abort(String reason, Throwable cause) {
130    // close all WALs before calling abort on RS.
131    // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
132    // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
133    // is already broken.
134    for (WAL wal : walNeedsRoll.keySet()) {
135      // shutdown rather than close here since we are going to abort the RS and the wals need to be
136      // split when recovery
137      try {
138        wal.shutdown();
139      } catch (IOException e) {
140        LOG.warn("Failed to shutdown wal", e);
141      }
142    }
143    abortable.abort(reason, cause);
144  }
145
146  @Override
147  public void run() {
148    while (running) {
149      boolean periodic = false;
150      long now = System.currentTimeMillis();
151      checkLowReplication(now);
152      periodic = (now - this.lastRollTime) > this.rollPeriod;
153      if (periodic) {
154        // Time for periodic roll, fall through
155        LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
156      } else {
157        synchronized (this) {
158          if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
159            // WAL roll requested, fall through
160            LOG.debug("WAL roll requested");
161          } else {
162            try {
163              wait(this.threadWakeFrequency);
164            } catch (InterruptedException e) {
165              // restore the interrupt state
166              Thread.currentThread().interrupt();
167            }
168            // goto the beginning to check whether again whether we should fall through to roll
169            // several WALs, and also check whether we should quit.
170            continue;
171          }
172        }
173      }
174      try {
175        this.lastRollTime = System.currentTimeMillis();
176        for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
177          .hasNext();) {
178          Entry<WAL, Boolean> entry = iter.next();
179          WAL wal = entry.getKey();
180          // reset the flag in front to avoid missing roll request before we return from rollWriter.
181          walNeedsRoll.put(wal, Boolean.FALSE);
182          // Force the roll if the logroll.period is elapsed or if a roll was requested.
183          // The returned value is an array of actual region names.
184          byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
185          if (regionsToFlush != null) {
186            for (byte[] r : regionsToFlush) {
187              scheduleFlush(Bytes.toString(r));
188            }
189          }
190          afterRoll(wal);
191        }
192      } catch (FailedLogCloseException | ConnectException e) {
193        abort("Failed log close in log roller", e);
194      } catch (IOException ex) {
195        // Abort if we get here. We probably won't recover an IOE. HBASE-1132
196        abort("IOE in log roller",
197          ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
198      } catch (Exception ex) {
199        LOG.error("Log rolling failed", ex);
200        abort("Log rolling failed", ex);
201      }
202    }
203    LOG.info("LogRoller exiting.");
204  }
205
206  /**
207   * Called after we finish rolling the give {@code wal}.
208   */
209  protected void afterRoll(WAL wal) {
210  }
211
212  /**
213   * @param encodedRegionName Encoded name of region to flush.
214   */
215  protected abstract void scheduleFlush(String encodedRegionName);
216
217  private boolean isWaiting() {
218    Thread.State state = getState();
219    return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
220  }
221
222  /**
223   * @return true if all WAL roll finished
224   */
225  public boolean walRollFinished() {
226    return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting();
227  }
228
229  /**
230   * Wait until all wals have been rolled after calling {@link #requestRollAll()}.
231   */
232  public void waitUntilWalRollFinished() throws InterruptedException {
233    while (!walRollFinished()) {
234      Thread.sleep(100);
235    }
236  }
237
238  @Override
239  public void close() {
240    running = false;
241    interrupt();
242  }
243}