View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Map.Entry;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.locks.ReentrantLock;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.RemoteExceptionHandler;
32  import org.apache.hadoop.hbase.Server;
33  import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
34  import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
35  import org.apache.hadoop.hbase.wal.WAL;
36  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.HasThread;
39  
40  import java.io.IOException;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  import com.google.common.annotations.VisibleForTesting;
45  
46  /**
47   * Runs periodically to determine if the WAL should be rolled.
48   *
49   * NOTE: This class extends Thread rather than Chore because the sleep time
50   * can be interrupted when there is something to do, rather than the Chore
51   * sleep time which is invariant.
52   *
53   * TODO: change to a pool of threads
54   */
55  @InterfaceAudience.Private
56  @VisibleForTesting
57  public class LogRoller extends HasThread {
58    private static final Log LOG = LogFactory.getLog(LogRoller.class);
59    private final ReentrantLock rollLock = new ReentrantLock();
60    private final AtomicBoolean rollLog = new AtomicBoolean(false);
61    private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
62        new ConcurrentHashMap<WAL, Boolean>();
63    private final Server server;
64    protected final RegionServerServices services;
65    private volatile long lastrolltime = System.currentTimeMillis();
66    // Period to roll log.
67    private final long rollperiod;
68    private final int threadWakeFrequency;
69    // The interval to check low replication on hlog's pipeline
70    private long checkLowReplicationInterval;
71  
72    public void addWAL(final WAL wal) {
73      if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
74        wal.registerWALActionsListener(new WALActionsListener.Base() {
75          @Override
76          public void logRollRequested(boolean lowReplicas) {
77            walNeedsRoll.put(wal, Boolean.TRUE);
78            // TODO logs will contend with each other here, replace with e.g. DelayedQueue
79            synchronized(rollLog) {
80              rollLog.set(true);
81              rollLog.notifyAll();
82            }
83          }
84        });
85      }
86    }
87  
88    public void requestRollAll() {
89      for (WAL wal : walNeedsRoll.keySet()) {
90        walNeedsRoll.put(wal, Boolean.TRUE);
91      }
92      synchronized(rollLog) {
93        rollLog.set(true);
94        rollLog.notifyAll();
95      }
96    }
97  
98    /** @param server */
99    public LogRoller(final Server server, final RegionServerServices services) {
100     super("LogRoller");
101     this.server = server;
102     this.services = services;
103     this.rollperiod = this.server.getConfiguration().
104       getLong("hbase.regionserver.logroll.period", 3600000);
105     this.threadWakeFrequency = this.server.getConfiguration().
106       getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
107     this.checkLowReplicationInterval = this.server.getConfiguration().getLong(
108         "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
109   }
110 
111   @Override
112   public void interrupt() {
113     // Wake up if we are waiting on rollLog. For tests.
114     synchronized (rollLog) {
115       this.rollLog.notify();
116     }
117     super.interrupt();
118   }
119 
120   /**
121    * we need to check low replication in period, see HBASE-18132
122    */
123   void checkLowReplication(long now) {
124     try {
125       for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
126         WAL wal = entry.getKey();
127         boolean neeRollAlready = entry.getValue();
128         if(wal instanceof FSHLog && !neeRollAlready) {
129           FSHLog hlog = (FSHLog)wal;
130           if ((now - hlog.getLastTimeCheckLowReplication())
131               > this.checkLowReplicationInterval) {
132             hlog.checkLogRoll();
133           }
134         }
135       }
136     } catch (Throwable e) {
137       LOG.warn("Failed checking low replication", e);
138     }
139   }
140 
141   @Override
142   public void run() {
143     while (!server.isStopped()) {
144       long now = System.currentTimeMillis();
145       checkLowReplication(now);
146       boolean periodic = false;
147       if (!rollLog.get()) {
148         periodic = (now - this.lastrolltime) > this.rollperiod;
149         if (!periodic) {
150           synchronized (rollLog) {
151             try {
152               if (!rollLog.get()) {
153                 rollLog.wait(this.threadWakeFrequency);
154               }
155             } catch (InterruptedException e) {
156               // Fall through
157             }
158           }
159           continue;
160         }
161         // Time for periodic roll
162         if (LOG.isDebugEnabled()) {
163           LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
164         }
165       } else if (LOG.isDebugEnabled()) {
166         LOG.debug("WAL roll requested");
167       }
168       rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
169       try {
170         this.lastrolltime = now;
171         for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
172           final WAL wal = entry.getKey();
173           // Force the roll if the logroll.period is elapsed or if a roll was requested.
174           // The returned value is an array of actual region names.
175           final byte [][] regionsToFlush = wal.rollWriter(periodic ||
176               entry.getValue().booleanValue());
177           walNeedsRoll.put(wal, Boolean.FALSE);
178           if (regionsToFlush != null) {
179             for (byte [] r: regionsToFlush) scheduleFlush(r);
180           }
181         }
182       } catch (FailedLogCloseException e) {
183         server.abort("Failed log close in log roller", e);
184       } catch (java.net.ConnectException e) {
185         server.abort("Failed log close in log roller", e);
186       } catch (IOException ex) {
187         LOG.fatal("Aborting", ex);
188         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
189         server.abort("IOE in log roller",
190           RemoteExceptionHandler.checkIOException(ex));
191       } catch (Exception ex) {
192         final String msg = "Failed rolling WAL; aborting to recover edits!";
193         LOG.error(msg, ex);
194         server.abort(msg, ex);
195       } finally {
196         try {
197           rollLog.set(false);
198         } finally {
199           rollLock.unlock();
200         }
201       }
202     }
203     LOG.info("LogRoller exiting.");
204   }
205 
206   /**
207    * @param encodedRegionName Encoded name of region to flush.
208    */
209   private void scheduleFlush(final byte [] encodedRegionName) {
210     boolean scheduled = false;
211     Region r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName));
212     FlushRequester requester = null;
213     if (r != null) {
214       requester = this.services.getFlushRequester();
215       if (requester != null) {
216         // force flushing all stores to clean old logs
217         requester.requestFlush(r, true);
218         scheduled = true;
219       }
220     }
221     if (!scheduled) {
222       LOG.warn("Failed to schedule flush of " +
223         Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
224         requester);
225     }
226   }
227 
228   /**
229    * For testing only
230    * @return true if all WAL roll finished
231    */
232   @VisibleForTesting
233   public boolean walRollFinished() {
234     for (boolean needRoll : walNeedsRoll.values()) {
235       if (needRoll) {
236         return false;
237       }
238     }
239     return true;
240   }
241 }