View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Map.Entry;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.locks.ReentrantLock;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.RemoteExceptionHandler;
32  import org.apache.hadoop.hbase.Server;
33  import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
34  import org.apache.hadoop.hbase.wal.WAL;
35  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.HasThread;
38  
39  import java.io.IOException;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import com.google.common.annotations.VisibleForTesting;
44  
45  /**
46   * Runs periodically to determine if the WAL should be rolled.
47   *
48   * NOTE: This class extends Thread rather than Chore because the sleep time
49   * can be interrupted when there is something to do, rather than the Chore
50   * sleep time which is invariant.
51   *
52   * TODO: change to a pool of threads
53   */
54  @InterfaceAudience.Private
55  @VisibleForTesting
56  public class LogRoller extends HasThread {
57    static final Log LOG = LogFactory.getLog(LogRoller.class);
58    private final ReentrantLock rollLock = new ReentrantLock();
59    private final AtomicBoolean rollLog = new AtomicBoolean(false);
60    private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
61        new ConcurrentHashMap<WAL, Boolean>();
62    private final Server server;
63    protected final RegionServerServices services;
64    private volatile long lastrolltime = System.currentTimeMillis();
65    // Period to roll log.
66    private final long rollperiod;
67    private final int threadWakeFrequency;
68  
69    public void addWAL(final WAL wal) {
70      if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
71        wal.registerWALActionsListener(new WALActionsListener.Base() {
72          @Override
73          public void logRollRequested(boolean lowReplicas) {
74            walNeedsRoll.put(wal, Boolean.TRUE);
75            // TODO logs will contend with each other here, replace with e.g. DelayedQueue
76            synchronized(rollLog) {
77              rollLog.set(true);
78              rollLog.notifyAll();
79            }
80          }
81        });
82      }
83    }
84  
85    public void requestRollAll() {
86      for (WAL wal : walNeedsRoll.keySet()) {
87        walNeedsRoll.put(wal, Boolean.TRUE);
88      }
89      synchronized(rollLog) {
90        rollLog.set(true);
91        rollLog.notifyAll();
92      }
93    }
94  
95    /** @param server */
96    public LogRoller(final Server server, final RegionServerServices services) {
97      super();
98      this.server = server;
99      this.services = services;
100     this.rollperiod = this.server.getConfiguration().
101       getLong("hbase.regionserver.logroll.period", 3600000);
102     this.threadWakeFrequency = this.server.getConfiguration().
103       getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
104   }
105 
106   @Override
107   public void interrupt() {
108     // Wake up if we are waiting on rollLog. For tests.
109     synchronized (rollLog) {
110       this.rollLog.notify();
111     }
112     super.interrupt();
113   }
114 
115   @Override
116   public void run() {
117     while (!server.isStopped()) {
118       long now = System.currentTimeMillis();
119       boolean periodic = false;
120       if (!rollLog.get()) {
121         periodic = (now - this.lastrolltime) > this.rollperiod;
122         if (!periodic) {
123           synchronized (rollLog) {
124             try {
125               if (!rollLog.get()) {
126                 rollLog.wait(this.threadWakeFrequency);
127               }
128             } catch (InterruptedException e) {
129               // Fall through
130             }
131           }
132           continue;
133         }
134         // Time for periodic roll
135         if (LOG.isDebugEnabled()) {
136           LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
137         }
138       } else if (LOG.isDebugEnabled()) {
139         LOG.debug("WAL roll requested");
140       }
141       rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
142       try {
143         this.lastrolltime = now;
144         for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
145           final WAL wal = entry.getKey();
146           // Force the roll if the logroll.period is elapsed or if a roll was requested.
147           // The returned value is an array of actual region names.
148           final byte [][] regionsToFlush = wal.rollWriter(periodic ||
149               entry.getValue().booleanValue());
150           walNeedsRoll.put(wal, Boolean.FALSE);
151           if (regionsToFlush != null) {
152             for (byte [] r: regionsToFlush) scheduleFlush(r);
153           }
154         }
155       } catch (FailedLogCloseException e) {
156         server.abort("Failed log close in log roller", e);
157       } catch (java.net.ConnectException e) {
158         server.abort("Failed log close in log roller", e);
159       } catch (IOException ex) {
160         LOG.fatal("Aborting", ex);
161         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
162         server.abort("IOE in log roller",
163           RemoteExceptionHandler.checkIOException(ex));
164       } catch (Exception ex) {
165         final String msg = "Failed rolling WAL; aborting to recover edits!";
166         LOG.error(msg, ex);
167         server.abort(msg, ex);
168       } finally {
169         try {
170           rollLog.set(false);
171         } finally {
172           rollLock.unlock();
173         }
174       }
175     }
176     LOG.info("LogRoller exiting.");
177   }
178 
179   /**
180    * @param encodedRegionName Encoded name of region to flush.
181    */
182   private void scheduleFlush(final byte [] encodedRegionName) {
183     boolean scheduled = false;
184     Region r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName));
185     FlushRequester requester = null;
186     if (r != null) {
187       requester = this.services.getFlushRequester();
188       if (requester != null) {
189         // force flushing all stores to clean old logs
190         requester.requestFlush(r, true);
191         scheduled = true;
192       }
193     }
194     if (!scheduled) {
195       LOG.warn("Failed to schedule flush of " +
196         Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
197         requester);
198     }
199   }
200 }