001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.Closeable;
022import java.io.IOException;
023import java.net.ConnectException;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
033import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
034import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
035import org.apache.hadoop.hbase.regionserver.wal.WALClosedException;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.HasThread;
038import org.apache.hadoop.hbase.wal.WAL;
039import org.apache.hadoop.ipc.RemoteException;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045
046/**
047 * Runs periodically to determine if the WAL should be rolled.
048 *
049 * NOTE: This class extends Thread rather than Chore because the sleep time
050 * can be interrupted when there is something to do, rather than the Chore
051 * sleep time which is invariant.
052 *
053 * TODO: change to a pool of threads
054 */
055@InterfaceAudience.Private
056@VisibleForTesting
057public class LogRoller extends HasThread implements Closeable {
058  private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
059  private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
060  protected final RegionServerServices services;
061  private volatile long lastRollTime = System.currentTimeMillis();
062  // Period to roll log.
063  private final long rollPeriod;
064  private final int threadWakeFrequency;
065  // The interval to check low replication on hlog's pipeline
066  private long checkLowReplicationInterval;
067
068  private volatile boolean running = true;
069
070  public void addWAL(WAL wal) {
071    // check without lock first
072    if (walNeedsRoll.containsKey(wal)) {
073      return;
074    }
075    // this is to avoid race between addWAL and requestRollAll.
076    synchronized (this) {
077      if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
078        wal.registerWALActionsListener(new WALActionsListener() {
079          @Override
080          public void logRollRequested(WALActionsListener.RollRequestReason reason) {
081            // TODO logs will contend with each other here, replace with e.g. DelayedQueue
082            synchronized (LogRoller.this) {
083              walNeedsRoll.put(wal, Boolean.TRUE);
084              LogRoller.this.notifyAll();
085            }
086          }
087        });
088      }
089    }
090  }
091
092  public void requestRollAll() {
093    synchronized (this) {
094      List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
095      for (WAL wal : wals) {
096        walNeedsRoll.put(wal, Boolean.TRUE);
097      }
098      notifyAll();
099    }
100  }
101
102  public LogRoller(RegionServerServices services) {
103    super("LogRoller");
104    this.services = services;
105    this.rollPeriod = this.services.getConfiguration().
106      getLong("hbase.regionserver.logroll.period", 3600000);
107    this.threadWakeFrequency = this.services.getConfiguration().
108      getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
109    this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
110        "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
111  }
112
113  /**
114   * we need to check low replication in period, see HBASE-18132
115   */
116  private void checkLowReplication(long now) {
117    try {
118      for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
119        WAL wal = entry.getKey();
120        boolean needRollAlready = entry.getValue();
121        if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
122          continue;
123        }
124        ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
125      }
126    } catch (Throwable e) {
127      LOG.warn("Failed checking low replication", e);
128    }
129  }
130
131  private void abort(String reason, Throwable cause) {
132    // close all WALs before calling abort on RS.
133    // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
134    // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
135    // is already broken.
136    for (WAL wal : walNeedsRoll.keySet()) {
137      // shutdown rather than close here since we are going to abort the RS and the wals need to be
138      // split when recovery
139      try {
140        wal.shutdown();
141      } catch (IOException e) {
142        LOG.warn("Failed to shutdown wal", e);
143      }
144    }
145    this.services.abort(reason, cause);
146  }
147
148  @Override
149  public void run() {
150    while (running) {
151      boolean periodic = false;
152      long now = System.currentTimeMillis();
153      checkLowReplication(now);
154      periodic = (now - this.lastRollTime) > this.rollPeriod;
155      if (periodic) {
156        // Time for periodic roll, fall through
157        LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
158      } else {
159        synchronized (this) {
160          if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
161            // WAL roll requested, fall through
162            LOG.debug("WAL roll requested");
163          } else {
164            try {
165              wait(this.threadWakeFrequency);
166            } catch (InterruptedException e) {
167              // restore the interrupt state
168              Thread.currentThread().interrupt();
169            }
170            // goto the beginning to check whether again whether we should fall through to roll
171            // several WALs, and also check whether we should quit.
172            continue;
173          }
174        }
175      }
176      try {
177        this.lastRollTime = System.currentTimeMillis();
178        for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
179            .hasNext();) {
180          Entry<WAL, Boolean> entry = iter.next();
181          WAL wal = entry.getKey();
182          // reset the flag in front to avoid missing roll request before we return from rollWriter.
183          walNeedsRoll.put(wal, Boolean.FALSE);
184          byte[][] regionsToFlush = null;
185          try {
186            // Force the roll if the logroll.period is elapsed or if a roll was requested.
187            // The returned value is an array of actual region names.
188            regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
189          } catch (WALClosedException e) {
190            LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e);
191            iter.remove();
192          }
193          if (regionsToFlush != null) {
194            for (byte[] r : regionsToFlush) {
195              scheduleFlush(Bytes.toString(r));
196            }
197          }
198        }
199      } catch (FailedLogCloseException | ConnectException e) {
200        abort("Failed log close in log roller", e);
201      } catch (IOException ex) {
202        // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
203        abort("IOE in log roller",
204          ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
205      } catch (Exception ex) {
206        LOG.error("Log rolling failed", ex);
207        abort("Log rolling failed", ex);
208      }
209    }
210    LOG.info("LogRoller exiting.");
211  }
212
213  /**
214   * @param encodedRegionName Encoded name of region to flush.
215   */
216  private void scheduleFlush(String encodedRegionName) {
217    HRegion r = (HRegion) this.services.getRegion(encodedRegionName);
218    if (r == null) {
219      LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName);
220      return;
221    }
222    FlushRequester requester = this.services.getFlushRequester();
223    if (requester == null) {
224      LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null",
225        encodedRegionName, r);
226      return;
227    }
228    // force flushing all stores to clean old logs
229    requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
230  }
231
232  /**
233   * @return true if all WAL roll finished
234   */
235  public boolean walRollFinished() {
236    return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll);
237  }
238
239  /**
240   * Wait until all wals have been rolled after calling {@link #requestRollAll()}.
241   */
242  public void waitUntilWalRollFinished() throws InterruptedException {
243    while (!walRollFinished()) {
244      Thread.sleep(100);
245    }
246  }
247
248  @Override
249  public void close() {
250    running = false;
251    interrupt();
252  }
253
254  @VisibleForTesting
255  Map<WAL, Boolean> getWalNeedsRoll() {
256    return this.walNeedsRoll;
257  }
258}