001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.net.ConnectException;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.atomic.AtomicBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
035import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
036import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
037import org.apache.hadoop.hbase.regionserver.wal.WALClosedException;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.ipc.RemoteException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Runs periodically to determine if the WAL should be rolled.
046 * <p/>
047 * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
048 * there is something to do, rather than the Chore sleep time which is invariant.
049 * <p/>
050 * The {@link #scheduleFlush(String, List)} is abstract here,
051 * as sometimes we hold a region without a region server but we still want to roll its WAL.
052 * <p/>
053 * TODO: change to a pool of threads
054 */
055@InterfaceAudience.Private
056public abstract class AbstractWALRoller<T extends Abortable> extends Thread
057  implements Closeable {
058  private static final Logger LOG = LoggerFactory.getLogger(AbstractWALRoller.class);
059
060  protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
061
062  protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
063  protected final T abortable;
064  // Period to roll log.
065  private final long rollPeriod;
066  private final int threadWakeFrequency;
067  // The interval to check low replication on hlog's pipeline
068  private final long checkLowReplicationInterval;
069
070  private volatile boolean running = true;
071
072  public void addWAL(WAL wal) {
073    // check without lock first
074    if (wals.containsKey(wal)) {
075      return;
076    }
077    // this is to avoid race between addWAL and requestRollAll.
078    synchronized (this) {
079      if (wals.putIfAbsent(wal, new RollController(wal)) == null) {
080        wal.registerWALActionsListener(new WALActionsListener() {
081          @Override
082          public void logRollRequested(WALActionsListener.RollRequestReason reason) {
083            // TODO logs will contend with each other here, replace with e.g. DelayedQueue
084            synchronized (AbstractWALRoller.this) {
085              RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal));
086              controller.requestRoll();
087              AbstractWALRoller.this.notifyAll();
088            }
089          }
090
091          @Override
092          public void postLogArchive(Path oldPath, Path newPath) throws IOException {
093            afterWALArchive(oldPath, newPath);
094          }
095        });
096      }
097    }
098  }
099
100  public void requestRollAll() {
101    synchronized (this) {
102      for (RollController controller : wals.values()) {
103        controller.requestRoll();
104      }
105      notifyAll();
106    }
107  }
108
109  protected AbstractWALRoller(String name, Configuration conf, T abortable) {
110    super(name);
111    this.abortable = abortable;
112    this.rollPeriod = conf.getLong(WAL_ROLL_PERIOD_KEY, 3600000);
113    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
114    this.checkLowReplicationInterval =
115      conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
116  }
117
118  /**
119   * we need to check low replication in period, see HBASE-18132
120   */
121  private void checkLowReplication(long now) {
122    try {
123      for (Entry<WAL, RollController> entry : wals.entrySet()) {
124        WAL wal = entry.getKey();
125        boolean needRollAlready = entry.getValue().needsRoll(now);
126        if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
127          continue;
128        }
129        ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
130      }
131    } catch (Throwable e) {
132      LOG.warn("Failed checking low replication", e);
133    }
134  }
135
136  private void abort(String reason, Throwable cause) {
137    // close all WALs before calling abort on RS.
138    // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
139    // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
140    // is already broken.
141    for (WAL wal : wals.keySet()) {
142      // shutdown rather than close here since we are going to abort the RS and the wals need to be
143      // split when recovery
144      try {
145        wal.shutdown();
146      } catch (IOException e) {
147        LOG.warn("Failed to shutdown wal", e);
148      }
149    }
150    abortable.abort(reason, cause);
151  }
152
153  @Override
154  public void run() {
155    while (running) {
156      long now = System.currentTimeMillis();
157      checkLowReplication(now);
158      synchronized (this) {
159        if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) {
160          try {
161            wait(this.threadWakeFrequency);
162          } catch (InterruptedException e) {
163            // restore the interrupt state
164            Thread.currentThread().interrupt();
165          }
166          // goto the beginning to check whether again whether we should fall through to roll
167          // several WALs, and also check whether we should quit.
168          continue;
169        }
170      }
171      try {
172        for (Iterator<Entry<WAL, RollController>> iter = wals.entrySet().iterator();
173             iter.hasNext();) {
174          Entry<WAL, RollController> entry = iter.next();
175          WAL wal = entry.getKey();
176          RollController controller = entry.getValue();
177          if (controller.isRollRequested()) {
178            // WAL roll requested, fall through
179            LOG.debug("WAL {} roll requested", wal);
180          } else if (controller.needsPeriodicRoll(now)){
181            // Time for periodic roll, fall through
182            LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod);
183          } else {
184            continue;
185          }
186          try {
187            // Force the roll if the logroll.period is elapsed or if a roll was requested.
188            // The returned value is an collection of actual region and family names.
189            Map<byte[], List<byte[]>> regionsToFlush = controller.rollWal(now);
190            if (regionsToFlush != null) {
191              for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
192                scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
193              }
194            }
195          } catch (WALClosedException e) {
196            LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e);
197            iter.remove();
198          }
199        }
200      } catch (FailedLogCloseException | ConnectException e) {
201        abort("Failed log close in log roller", e);
202      } catch (IOException ex) {
203        // Abort if we get here. We probably won't recover an IOE. HBASE-1132
204        abort("IOE in log roller",
205          ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
206      } catch (Exception ex) {
207        LOG.error("Log rolling failed", ex);
208        abort("Log rolling failed", ex);
209      }
210    }
211    LOG.info("LogRoller exiting.");
212  }
213
214  protected void afterWALArchive(Path oldPath, Path newPath) {
215  }
216
217  /**
218   * @param encodedRegionName Encoded name of region to flush.
219   * @param families stores of region to flush.
220   */
221  protected abstract void scheduleFlush(String encodedRegionName, List<byte[]> families);
222
223  private boolean isWaiting() {
224    Thread.State state = getState();
225    return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
226  }
227
228  /**
229   * @return true if all WAL roll finished
230   */
231  public boolean walRollFinished() {
232    // TODO add a status field of roll in RollController
233    return wals.values().stream().noneMatch(rc -> rc.needsRoll(System.currentTimeMillis()))
234      && isWaiting();
235  }
236
237  /**
238   * Wait until all wals have been rolled after calling {@link #requestRollAll()}.
239   */
240  public void waitUntilWalRollFinished() throws InterruptedException {
241    while (!walRollFinished()) {
242      Thread.sleep(100);
243    }
244  }
245
246  @Override
247  public void close() {
248    running = false;
249    interrupt();
250  }
251
252  /**
253   * Independently control the roll of each wal. When use multiwal,
254   * can avoid all wal roll together. see HBASE-24665 for detail
255   */
256  protected class RollController {
257    private final WAL wal;
258    private final AtomicBoolean rollRequest;
259    private long lastRollTime;
260
261    RollController(WAL wal) {
262      this.wal = wal;
263      this.rollRequest = new AtomicBoolean(false);
264      this.lastRollTime = System.currentTimeMillis();
265    }
266
267    public void requestRoll() {
268      this.rollRequest.set(true);
269    }
270
271    public Map<byte[], List<byte[]>> rollWal(long now) throws IOException {
272      this.lastRollTime = now;
273      // reset the flag in front to avoid missing roll request before we return from rollWriter.
274      this.rollRequest.set(false);
275      return wal.rollWriter(true);
276    }
277
278    public boolean isRollRequested() {
279      return rollRequest.get();
280    }
281
282    public boolean needsPeriodicRoll(long now) {
283      return (now - this.lastRollTime) > rollPeriod;
284    }
285
286    public boolean needsRoll(long now) {
287      return isRollRequested() || needsPeriodicRoll(now);
288    }
289  }
290}