001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.Closeable;
022import java.io.IOException;
023import java.net.ConnectException;
024import java.util.ArrayList;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map.Entry;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.Server;
032import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
033import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
034import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.HasThread;
037import org.apache.hadoop.hbase.wal.WAL;
038import org.apache.hadoop.ipc.RemoteException;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
044
045/**
046 * Runs periodically to determine if the WAL should be rolled.
047 *
048 * NOTE: This class extends Thread rather than Chore because the sleep time
049 * can be interrupted when there is something to do, rather than the Chore
050 * sleep time which is invariant.
051 *
052 * TODO: change to a pool of threads
053 */
054@InterfaceAudience.Private
055@VisibleForTesting
056public class LogRoller extends HasThread implements Closeable {
057  private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
058  private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
059  private final Server server;
060  protected final RegionServerServices services;
061  private volatile long lastRollTime = System.currentTimeMillis();
062  // Period to roll log.
063  private final long rollPeriod;
064  private final int threadWakeFrequency;
065  // The interval to check low replication on hlog's pipeline
066  private long checkLowReplicationInterval;
067
068  private volatile boolean running = true;
069
070  public void addWAL(WAL wal) {
071    // check without lock first
072    if (walNeedsRoll.containsKey(wal)) {
073      return;
074    }
075    // this is to avoid race between addWAL and requestRollAll.
076    synchronized (this) {
077      if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
078        wal.registerWALActionsListener(new WALActionsListener() {
079          @Override
080          public void logRollRequested(boolean lowReplicas) {
081            // TODO logs will contend with each other here, replace with e.g. DelayedQueue
082            synchronized (LogRoller.this) {
083              walNeedsRoll.put(wal, Boolean.TRUE);
084              LogRoller.this.notifyAll();
085            }
086          }
087        });
088      }
089    }
090  }
091
092  public void requestRollAll() {
093    synchronized (this) {
094      List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
095      for (WAL wal : wals) {
096        walNeedsRoll.put(wal, Boolean.TRUE);
097      }
098      notifyAll();
099    }
100  }
101
102  /** @param server */
103  public LogRoller(final Server server, final RegionServerServices services) {
104    super("LogRoller");
105    this.server = server;
106    this.services = services;
107    this.rollPeriod = this.server.getConfiguration().
108      getLong("hbase.regionserver.logroll.period", 3600000);
109    this.threadWakeFrequency = this.server.getConfiguration().
110      getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
111    this.checkLowReplicationInterval = this.server.getConfiguration().getLong(
112        "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
113  }
114
115  /**
116   * we need to check low replication in period, see HBASE-18132
117   */
118  private void checkLowReplication(long now) {
119    try {
120      for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
121        WAL wal = entry.getKey();
122        boolean needRollAlready = entry.getValue();
123        if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
124          continue;
125        }
126        ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
127      }
128    } catch (Throwable e) {
129      LOG.warn("Failed checking low replication", e);
130    }
131  }
132
133  private void abort(String reason, Throwable cause) {
134    // close all WALs before calling abort on RS.
135    // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
136    // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
137    // is already broken.
138    for (WAL wal : walNeedsRoll.keySet()) {
139      // shutdown rather than close here since we are going to abort the RS and the wals need to be
140      // split when recovery
141      try {
142        wal.shutdown();
143      } catch (IOException e) {
144        LOG.warn("Failed to shutdown wal", e);
145      }
146    }
147    server.abort(reason, cause);
148  }
149
150  @Override
151  public void run() {
152    while (running) {
153      boolean periodic = false;
154      long now = System.currentTimeMillis();
155      checkLowReplication(now);
156      periodic = (now - this.lastRollTime) > this.rollPeriod;
157      if (periodic) {
158        // Time for periodic roll, fall through
159        LOG.debug("Wal roll period {} ms elapsed", this.rollPeriod);
160      } else {
161        synchronized (this) {
162          if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
163            // WAL roll requested, fall through
164            LOG.debug("WAL roll requested");
165          } else {
166            try {
167              wait(this.threadWakeFrequency);
168            } catch (InterruptedException e) {
169              // restore the interrupt state
170              Thread.currentThread().interrupt();
171            }
172            // goto the beginning to check whether again whether we should fall through to roll
173            // several WALs, and also check whether we should quit.
174            continue;
175          }
176        }
177      }
178      try {
179        this.lastRollTime = System.currentTimeMillis();
180        for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
181            .hasNext();) {
182          Entry<WAL, Boolean> entry = iter.next();
183          WAL wal = entry.getKey();
184          // reset the flag in front to avoid missing roll request before we return from rollWriter.
185          walNeedsRoll.put(wal, Boolean.FALSE);
186            // Force the roll if the logroll.period is elapsed or if a roll was requested.
187            // The returned value is an array of actual region names.
188            byte[][]   regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
189          if (regionsToFlush != null) {
190            for (byte[] r : regionsToFlush) {
191              scheduleFlush(Bytes.toString(r));
192            }
193          }
194        }
195      } catch (FailedLogCloseException | ConnectException e) {
196        abort("Failed log close in log roller", e);
197      } catch (IOException ex) {
198        // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
199        abort("IOE in log roller",
200          ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
201      } catch (Exception ex) {
202        LOG.error("Log rolling failed", ex);
203        abort("Log rolling failed", ex);
204      }
205    }
206    LOG.info("LogRoller exiting.");
207  }
208
209  /**
210   * @param encodedRegionName Encoded name of region to flush.
211   */
212  private void scheduleFlush(String encodedRegionName) {
213    HRegion r = (HRegion) this.services.getRegion(encodedRegionName);
214    if (r == null) {
215      LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName);
216      return;
217    }
218    FlushRequester requester = this.services.getFlushRequester();
219    if (requester == null) {
220      LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null",
221        encodedRegionName, r);
222      return;
223    }
224    // force flushing all stores to clean old logs
225    requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
226  }
227
228  /**
229   * For testing only
230   * @return true if all WAL roll finished
231   */
232  @VisibleForTesting
233  public boolean walRollFinished() {
234    return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll);
235  }
236
237  @Override
238  public void close() {
239    running = false;
240    interrupt();
241  }
242}